1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *	  POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/cache/relcache.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *		RelationCacheInitialize			- initialize relcache (to empty)
18  *		RelationCacheInitializePhase2	- initialize shared-catalog entries
19  *		RelationCacheInitializePhase3	- finish initializing relcache
20  *		RelationIdGetRelation			- get a reldesc by relation id
21  *		RelationClose					- close an open relation
22  *
23  * NOTES
24  *		The following code contains many undocumented hacks.  Please be
25  *		careful....
26  */
27 #include "postgres.h"
28 
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/nbtree.h"
36 #include "access/parallel.h"
37 #include "access/reloptions.h"
38 #include "access/sysattr.h"
39 #include "access/table.h"
40 #include "access/tableam.h"
41 #include "access/tupdesc_details.h"
42 #include "access/xact.h"
43 #include "access/xlog.h"
44 #include "catalog/catalog.h"
45 #include "catalog/indexing.h"
46 #include "catalog/namespace.h"
47 #include "catalog/partition.h"
48 #include "catalog/pg_am.h"
49 #include "catalog/pg_amproc.h"
50 #include "catalog/pg_attrdef.h"
51 #include "catalog/pg_auth_members.h"
52 #include "catalog/pg_authid.h"
53 #include "catalog/pg_constraint.h"
54 #include "catalog/pg_database.h"
55 #include "catalog/pg_namespace.h"
56 #include "catalog/pg_opclass.h"
57 #include "catalog/pg_proc.h"
58 #include "catalog/pg_publication.h"
59 #include "catalog/pg_rewrite.h"
60 #include "catalog/pg_shseclabel.h"
61 #include "catalog/pg_statistic_ext.h"
62 #include "catalog/pg_subscription.h"
63 #include "catalog/pg_tablespace.h"
64 #include "catalog/pg_trigger.h"
65 #include "catalog/pg_type.h"
66 #include "catalog/schemapg.h"
67 #include "catalog/storage.h"
68 #include "commands/policy.h"
69 #include "commands/trigger.h"
70 #include "miscadmin.h"
71 #include "nodes/makefuncs.h"
72 #include "nodes/nodeFuncs.h"
73 #include "optimizer/optimizer.h"
74 #include "rewrite/rewriteDefine.h"
75 #include "rewrite/rowsecurity.h"
76 #include "storage/lmgr.h"
77 #include "storage/smgr.h"
78 #include "utils/array.h"
79 #include "utils/builtins.h"
80 #include "utils/datum.h"
81 #include "utils/fmgroids.h"
82 #include "utils/inval.h"
83 #include "utils/lsyscache.h"
84 #include "utils/memutils.h"
85 #include "utils/relmapper.h"
86 #include "utils/resowner_private.h"
87 #include "utils/snapmgr.h"
88 #include "utils/syscache.h"
89 
90 #define RELCACHE_INIT_FILEMAGIC		0x573266	/* version ID value */
91 
92 /*
93  * Default policy for whether to apply RECOVER_RELATION_BUILD_MEMORY:
94  * do so in clobber-cache builds but not otherwise.  This choice can be
95  * overridden at compile time with -DRECOVER_RELATION_BUILD_MEMORY=1 or =0.
96  */
97 #ifndef RECOVER_RELATION_BUILD_MEMORY
98 #if defined(CLOBBER_CACHE_ALWAYS) || defined(CLOBBER_CACHE_RECURSIVELY)
99 #define RECOVER_RELATION_BUILD_MEMORY 1
100 #else
101 #define RECOVER_RELATION_BUILD_MEMORY 0
102 #endif
103 #endif
104 
105 /*
106  *		hardcoded tuple descriptors, contents generated by genbki.pl
107  */
108 static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
109 static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
110 static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
111 static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
112 static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
113 static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
114 static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
115 static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
116 static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
117 static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
118 
119 /*
120  *		Hash tables that index the relation cache
121  *
122  *		We used to index the cache by both name and OID, but now there
123  *		is only an index by OID.
124  */
125 typedef struct relidcacheent
126 {
127 	Oid			reloid;
128 	Relation	reldesc;
129 } RelIdCacheEnt;
130 
131 static HTAB *RelationIdCache;
132 
133 /*
134  * This flag is false until we have prepared the critical relcache entries
135  * that are needed to do indexscans on the tables read by relcache building.
136  */
137 bool		criticalRelcachesBuilt = false;
138 
139 /*
140  * This flag is false until we have prepared the critical relcache entries
141  * for shared catalogs (which are the tables needed for login).
142  */
143 bool		criticalSharedRelcachesBuilt = false;
144 
145 /*
146  * This counter counts relcache inval events received since backend startup
147  * (but only for rels that are actually in cache).  Presently, we use it only
148  * to detect whether data about to be written by write_relcache_init_file()
149  * might already be obsolete.
150  */
151 static long relcacheInvalsReceived = 0L;
152 
153 /*
154  * in_progress_list is a stack of ongoing RelationBuildDesc() calls.  CREATE
155  * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock.
156  * It critically relies on each backend absorbing those changes no later than
157  * next transaction start.  Hence, RelationBuildDesc() loops until it finishes
158  * without accepting a relevant invalidation.  (Most invalidation consumers
159  * don't do this.)
160  */
161 typedef struct inprogressent
162 {
163 	Oid			reloid;			/* OID of relation being built */
164 	bool		invalidated;	/* whether an invalidation arrived for it */
165 } InProgressEnt;
166 
167 static InProgressEnt *in_progress_list;
168 static int	in_progress_list_len;
169 static int	in_progress_list_maxlen;
170 
171 /*
172  * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
173  * cleanup work.  This list intentionally has limited size; if it overflows,
174  * we fall back to scanning the whole hashtable.  There is no value in a very
175  * large list because (1) at some point, a hash_seq_search scan is faster than
176  * retail lookups, and (2) the value of this is to reduce EOXact work for
177  * short transactions, which can't have dirtied all that many tables anyway.
178  * EOXactListAdd() does not bother to prevent duplicate list entries, so the
179  * cleanup processing must be idempotent.
180  */
181 #define MAX_EOXACT_LIST 32
182 static Oid	eoxact_list[MAX_EOXACT_LIST];
183 static int	eoxact_list_len = 0;
184 static bool eoxact_list_overflowed = false;
185 
186 #define EOXactListAdd(rel) \
187 	do { \
188 		if (eoxact_list_len < MAX_EOXACT_LIST) \
189 			eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
190 		else \
191 			eoxact_list_overflowed = true; \
192 	} while (0)
193 
194 /*
195  * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
196  * cleanup work.  The array expands as needed; there is no hashtable because
197  * we don't need to access individual items except at EOXact.
198  */
199 static TupleDesc *EOXactTupleDescArray;
200 static int	NextEOXactTupleDescNum = 0;
201 static int	EOXactTupleDescArrayLen = 0;
202 
203 /*
204  *		macros to manipulate the lookup hashtable
205  */
206 #define RelationCacheInsert(RELATION, replace_allowed)	\
207 do { \
208 	RelIdCacheEnt *hentry; bool found; \
209 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
210 										   (void *) &((RELATION)->rd_id), \
211 										   HASH_ENTER, &found); \
212 	if (found) \
213 	{ \
214 		/* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
215 		Relation _old_rel = hentry->reldesc; \
216 		Assert(replace_allowed); \
217 		hentry->reldesc = (RELATION); \
218 		if (RelationHasReferenceCountZero(_old_rel)) \
219 			RelationDestroyRelation(_old_rel, false); \
220 		else if (!IsBootstrapProcessingMode()) \
221 			elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
222 				 RelationGetRelationName(_old_rel)); \
223 	} \
224 	else \
225 		hentry->reldesc = (RELATION); \
226 } while(0)
227 
228 #define RelationIdCacheLookup(ID, RELATION) \
229 do { \
230 	RelIdCacheEnt *hentry; \
231 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
232 										   (void *) &(ID), \
233 										   HASH_FIND, NULL); \
234 	if (hentry) \
235 		RELATION = hentry->reldesc; \
236 	else \
237 		RELATION = NULL; \
238 } while(0)
239 
240 #define RelationCacheDelete(RELATION) \
241 do { \
242 	RelIdCacheEnt *hentry; \
243 	hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
244 										   (void *) &((RELATION)->rd_id), \
245 										   HASH_REMOVE, NULL); \
246 	if (hentry == NULL) \
247 		elog(WARNING, "failed to delete relcache entry for OID %u", \
248 			 (RELATION)->rd_id); \
249 } while(0)
250 
251 
252 /*
253  * Special cache for opclass-related information
254  *
255  * Note: only default support procs get cached, ie, those with
256  * lefttype = righttype = opcintype.
257  */
258 typedef struct opclasscacheent
259 {
260 	Oid			opclassoid;		/* lookup key: OID of opclass */
261 	bool		valid;			/* set true after successful fill-in */
262 	StrategyNumber numSupport;	/* max # of support procs (from pg_am) */
263 	Oid			opcfamily;		/* OID of opclass's family */
264 	Oid			opcintype;		/* OID of opclass's declared input type */
265 	RegProcedure *supportProcs; /* OIDs of support procedures */
266 } OpClassCacheEnt;
267 
268 static HTAB *OpClassCache = NULL;
269 
270 
271 /* non-export function prototypes */
272 
273 static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
274 static void RelationClearRelation(Relation relation, bool rebuild);
275 
276 static void RelationReloadIndexInfo(Relation relation);
277 static void RelationReloadNailed(Relation relation);
278 static void RelationFlushRelation(Relation relation);
279 static void RememberToFreeTupleDescAtEOX(TupleDesc td);
280 #ifdef USE_ASSERT_CHECKING
281 static void AssertPendingSyncConsistency(Relation relation);
282 #endif
283 static void AtEOXact_cleanup(Relation relation, bool isCommit);
284 static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
285 								SubTransactionId mySubid, SubTransactionId parentSubid);
286 static bool load_relcache_init_file(bool shared);
287 static void write_relcache_init_file(bool shared);
288 static void write_item(const void *data, Size len, FILE *fp);
289 
290 static void formrdesc(const char *relationName, Oid relationReltype,
291 					  bool isshared, int natts, const FormData_pg_attribute *attrs);
292 
293 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
294 static Relation AllocateRelationDesc(Form_pg_class relp);
295 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
296 static void RelationBuildTupleDesc(Relation relation);
297 static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
298 static void RelationInitPhysicalAddr(Relation relation);
299 static void load_critical_index(Oid indexoid, Oid heapoid);
300 static TupleDesc GetPgClassDescriptor(void);
301 static TupleDesc GetPgIndexDescriptor(void);
302 static void AttrDefaultFetch(Relation relation);
303 static void CheckConstraintFetch(Relation relation);
304 static int	CheckConstraintCmp(const void *a, const void *b);
305 static void InitIndexAmRoutine(Relation relation);
306 static void IndexSupportInitialize(oidvector *indclass,
307 								   RegProcedure *indexSupport,
308 								   Oid *opFamily,
309 								   Oid *opcInType,
310 								   StrategyNumber maxSupportNumber,
311 								   AttrNumber maxAttributeNumber);
312 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
313 										  StrategyNumber numSupport);
314 static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
315 static void unlink_initfile(const char *initfilename, int elevel);
316 
317 
318 /*
319  *		ScanPgRelation
320  *
321  *		This is used by RelationBuildDesc to find a pg_class
322  *		tuple matching targetRelId.  The caller must hold at least
323  *		AccessShareLock on the target relid to prevent concurrent-update
324  *		scenarios; it isn't guaranteed that all scans used to build the
325  *		relcache entry will use the same snapshot.  If, for example,
326  *		an attribute were to be added after scanning pg_class and before
327  *		scanning pg_attribute, relnatts wouldn't match.
328  *
329  *		NB: the returned tuple has been copied into palloc'd storage
330  *		and must eventually be freed with heap_freetuple.
331  */
332 static HeapTuple
ScanPgRelation(Oid targetRelId,bool indexOK,bool force_non_historic)333 ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
334 {
335 	HeapTuple	pg_class_tuple;
336 	Relation	pg_class_desc;
337 	SysScanDesc pg_class_scan;
338 	ScanKeyData key[1];
339 	Snapshot	snapshot = NULL;
340 
341 	/*
342 	 * If something goes wrong during backend startup, we might find ourselves
343 	 * trying to read pg_class before we've selected a database.  That ain't
344 	 * gonna work, so bail out with a useful error message.  If this happens,
345 	 * it probably means a relcache entry that needs to be nailed isn't.
346 	 */
347 	if (!OidIsValid(MyDatabaseId))
348 		elog(FATAL, "cannot read pg_class without having selected a database");
349 
350 	/*
351 	 * form a scan key
352 	 */
353 	ScanKeyInit(&key[0],
354 				Anum_pg_class_oid,
355 				BTEqualStrategyNumber, F_OIDEQ,
356 				ObjectIdGetDatum(targetRelId));
357 
358 	/*
359 	 * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
360 	 * built the critical relcache entries (this includes initdb and startup
361 	 * without a pg_internal.init file).  The caller can also force a heap
362 	 * scan by setting indexOK == false.
363 	 */
364 	pg_class_desc = table_open(RelationRelationId, AccessShareLock);
365 
366 	/*
367 	 * The caller might need a tuple that's newer than the one the historic
368 	 * snapshot; currently the only case requiring to do so is looking up the
369 	 * relfilenode of non mapped system relations during decoding. That
370 	 * snapshot can't change in the midst of a relcache build, so there's no
371 	 * need to register the snapshot.
372 	 */
373 	if (force_non_historic)
374 		snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
375 
376 	pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
377 									   indexOK && criticalRelcachesBuilt,
378 									   snapshot,
379 									   1, key);
380 
381 	pg_class_tuple = systable_getnext(pg_class_scan);
382 
383 	/*
384 	 * Must copy tuple before releasing buffer.
385 	 */
386 	if (HeapTupleIsValid(pg_class_tuple))
387 		pg_class_tuple = heap_copytuple(pg_class_tuple);
388 
389 	/* all done */
390 	systable_endscan(pg_class_scan);
391 	table_close(pg_class_desc, AccessShareLock);
392 
393 	return pg_class_tuple;
394 }
395 
396 /*
397  *		AllocateRelationDesc
398  *
399  *		This is used to allocate memory for a new relation descriptor
400  *		and initialize the rd_rel field from the given pg_class tuple.
401  */
402 static Relation
AllocateRelationDesc(Form_pg_class relp)403 AllocateRelationDesc(Form_pg_class relp)
404 {
405 	Relation	relation;
406 	MemoryContext oldcxt;
407 	Form_pg_class relationForm;
408 
409 	/* Relcache entries must live in CacheMemoryContext */
410 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
411 
412 	/*
413 	 * allocate and zero space for new relation descriptor
414 	 */
415 	relation = (Relation) palloc0(sizeof(RelationData));
416 
417 	/* make sure relation is marked as having no open file yet */
418 	relation->rd_smgr = NULL;
419 
420 	/*
421 	 * Copy the relation tuple form
422 	 *
423 	 * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
424 	 * variable-length fields (relacl, reloptions) are NOT stored in the
425 	 * relcache --- there'd be little point in it, since we don't copy the
426 	 * tuple's nulls bitmap and hence wouldn't know if the values are valid.
427 	 * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
428 	 * it from the syscache if you need it.  The same goes for the original
429 	 * form of reloptions (however, we do store the parsed form of reloptions
430 	 * in rd_options).
431 	 */
432 	relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
433 
434 	memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
435 
436 	/* initialize relation tuple form */
437 	relation->rd_rel = relationForm;
438 
439 	/* and allocate attribute tuple form storage */
440 	relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts);
441 	/* which we mark as a reference-counted tupdesc */
442 	relation->rd_att->tdrefcount = 1;
443 
444 	MemoryContextSwitchTo(oldcxt);
445 
446 	return relation;
447 }
448 
449 /*
450  * RelationParseRelOptions
451  *		Convert pg_class.reloptions into pre-parsed rd_options
452  *
453  * tuple is the real pg_class tuple (not rd_rel!) for relation
454  *
455  * Note: rd_rel and (if an index) rd_indam must be valid already
456  */
457 static void
RelationParseRelOptions(Relation relation,HeapTuple tuple)458 RelationParseRelOptions(Relation relation, HeapTuple tuple)
459 {
460 	bytea	   *options;
461 	amoptions_function amoptsfn;
462 
463 	relation->rd_options = NULL;
464 
465 	/*
466 	 * Look up any AM-specific parse function; fall out if relkind should not
467 	 * have options.
468 	 */
469 	switch (relation->rd_rel->relkind)
470 	{
471 		case RELKIND_RELATION:
472 		case RELKIND_TOASTVALUE:
473 		case RELKIND_VIEW:
474 		case RELKIND_MATVIEW:
475 		case RELKIND_PARTITIONED_TABLE:
476 			amoptsfn = NULL;
477 			break;
478 		case RELKIND_INDEX:
479 		case RELKIND_PARTITIONED_INDEX:
480 			amoptsfn = relation->rd_indam->amoptions;
481 			break;
482 		default:
483 			return;
484 	}
485 
486 	/*
487 	 * Fetch reloptions from tuple; have to use a hardwired descriptor because
488 	 * we might not have any other for pg_class yet (consider executing this
489 	 * code for pg_class itself)
490 	 */
491 	options = extractRelOptions(tuple, GetPgClassDescriptor(), amoptsfn);
492 
493 	/*
494 	 * Copy parsed data into CacheMemoryContext.  To guard against the
495 	 * possibility of leaks in the reloptions code, we want to do the actual
496 	 * parsing in the caller's memory context and copy the results into
497 	 * CacheMemoryContext after the fact.
498 	 */
499 	if (options)
500 	{
501 		relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
502 												  VARSIZE(options));
503 		memcpy(relation->rd_options, options, VARSIZE(options));
504 		pfree(options);
505 	}
506 }
507 
508 /*
509  *		RelationBuildTupleDesc
510  *
511  *		Form the relation's tuple descriptor from information in
512  *		the pg_attribute, pg_attrdef & pg_constraint system catalogs.
513  */
514 static void
RelationBuildTupleDesc(Relation relation)515 RelationBuildTupleDesc(Relation relation)
516 {
517 	HeapTuple	pg_attribute_tuple;
518 	Relation	pg_attribute_desc;
519 	SysScanDesc pg_attribute_scan;
520 	ScanKeyData skey[2];
521 	int			need;
522 	TupleConstr *constr;
523 	AttrDefault *attrdef = NULL;
524 	AttrMissing *attrmiss = NULL;
525 	int			ndef = 0;
526 
527 	/* copy some fields from pg_class row to rd_att */
528 	relation->rd_att->tdtypeid = relation->rd_rel->reltype;
529 	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
530 
531 	constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
532 												sizeof(TupleConstr));
533 	constr->has_not_null = false;
534 	constr->has_generated_stored = false;
535 
536 	/*
537 	 * Form a scan key that selects only user attributes (attnum > 0).
538 	 * (Eliminating system attribute rows at the index level is lots faster
539 	 * than fetching them.)
540 	 */
541 	ScanKeyInit(&skey[0],
542 				Anum_pg_attribute_attrelid,
543 				BTEqualStrategyNumber, F_OIDEQ,
544 				ObjectIdGetDatum(RelationGetRelid(relation)));
545 	ScanKeyInit(&skey[1],
546 				Anum_pg_attribute_attnum,
547 				BTGreaterStrategyNumber, F_INT2GT,
548 				Int16GetDatum(0));
549 
550 	/*
551 	 * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
552 	 * built the critical relcache entries (this includes initdb and startup
553 	 * without a pg_internal.init file).
554 	 */
555 	pg_attribute_desc = table_open(AttributeRelationId, AccessShareLock);
556 	pg_attribute_scan = systable_beginscan(pg_attribute_desc,
557 										   AttributeRelidNumIndexId,
558 										   criticalRelcachesBuilt,
559 										   NULL,
560 										   2, skey);
561 
562 	/*
563 	 * add attribute data to relation->rd_att
564 	 */
565 	need = RelationGetNumberOfAttributes(relation);
566 
567 	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
568 	{
569 		Form_pg_attribute attp;
570 		int			attnum;
571 		bool        atthasmissing;
572 
573 		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
574 
575 		attnum = attp->attnum;
576 		if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
577 			elog(ERROR, "invalid attribute number %d for %s",
578 				 attp->attnum, RelationGetRelationName(relation));
579 
580 
581 		memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
582 			   attp,
583 			   ATTRIBUTE_FIXED_PART_SIZE);
584 
585 		/*
586 		 * Fix atthasmissing flag - it's only for plain tables. Others
587 		 * should not have missing values set, but there may be some left from
588 		 * before when we placed that check, so this code defensively ignores
589 		 * such values.
590 		 */
591 		atthasmissing = attp->atthasmissing;
592 		if (relation->rd_rel->relkind != RELKIND_RELATION && atthasmissing)
593 		{
594 			Form_pg_attribute nattp;
595 
596 			atthasmissing = false;
597 			nattp = TupleDescAttr(relation->rd_att, attnum - 1);
598 			nattp->atthasmissing = false;
599 		}
600 
601 		/* Update constraint/default info */
602 		if (attp->attnotnull)
603 			constr->has_not_null = true;
604 		if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
605 			constr->has_generated_stored = true;
606 
607 		/* If the column has a default, fill it into the attrdef array */
608 		if (attp->atthasdef)
609 		{
610 			if (attrdef == NULL)
611 				attrdef = (AttrDefault *)
612 					MemoryContextAllocZero(CacheMemoryContext,
613 										   RelationGetNumberOfAttributes(relation) *
614 										   sizeof(AttrDefault));
615 			attrdef[ndef].adnum = attnum;
616 			attrdef[ndef].adbin = NULL;
617 
618 			ndef++;
619 		}
620 
621 		/* Likewise for a missing value */
622 		if (atthasmissing)
623 		{
624 			Datum		missingval;
625 			bool		missingNull;
626 
627 			/* Do we have a missing value? */
628 			missingval = heap_getattr(pg_attribute_tuple,
629 									  Anum_pg_attribute_attmissingval,
630 									  pg_attribute_desc->rd_att,
631 									  &missingNull);
632 			if (!missingNull)
633 			{
634 				/* Yes, fetch from the array */
635 				MemoryContext oldcxt;
636 				bool		is_null;
637 				int			one = 1;
638 				Datum		missval;
639 
640 				if (attrmiss == NULL)
641 					attrmiss = (AttrMissing *)
642 						MemoryContextAllocZero(CacheMemoryContext,
643 											   relation->rd_rel->relnatts *
644 											   sizeof(AttrMissing));
645 
646 				missval = array_get_element(missingval,
647 											1,
648 											&one,
649 											-1,
650 											attp->attlen,
651 											attp->attbyval,
652 											attp->attalign,
653 											&is_null);
654 				Assert(!is_null);
655 				if (attp->attbyval)
656 				{
657 					/* for copy by val just copy the datum direct */
658 					attrmiss[attnum - 1].am_value = missval;
659 				}
660 				else
661 				{
662 					/* otherwise copy in the correct context */
663 					oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
664 					attrmiss[attnum - 1].am_value = datumCopy(missval,
665 															  attp->attbyval,
666 															  attp->attlen);
667 					MemoryContextSwitchTo(oldcxt);
668 				}
669 				attrmiss[attnum - 1].am_present = true;
670 			}
671 		}
672 		need--;
673 		if (need == 0)
674 			break;
675 	}
676 
677 	/*
678 	 * end the scan and close the attribute relation
679 	 */
680 	systable_endscan(pg_attribute_scan);
681 	table_close(pg_attribute_desc, AccessShareLock);
682 
683 	if (need != 0)
684 		elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
685 			 need, RelationGetRelid(relation));
686 
687 	/*
688 	 * The attcacheoff values we read from pg_attribute should all be -1
689 	 * ("unknown").  Verify this if assert checking is on.  They will be
690 	 * computed when and if needed during tuple access.
691 	 */
692 #ifdef USE_ASSERT_CHECKING
693 	{
694 		int			i;
695 
696 		for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
697 			Assert(TupleDescAttr(relation->rd_att, i)->attcacheoff == -1);
698 	}
699 #endif
700 
701 	/*
702 	 * However, we can easily set the attcacheoff value for the first
703 	 * attribute: it must be zero.  This eliminates the need for special cases
704 	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
705 	 */
706 	if (RelationGetNumberOfAttributes(relation) > 0)
707 		TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
708 
709 	/*
710 	 * Set up constraint/default info
711 	 */
712 	if (constr->has_not_null ||
713 		constr->has_generated_stored ||
714 		ndef > 0 ||
715 		attrmiss ||
716 		relation->rd_rel->relchecks)
717 	{
718 		relation->rd_att->constr = constr;
719 
720 		if (ndef > 0)			/* DEFAULTs */
721 		{
722 			if (ndef < RelationGetNumberOfAttributes(relation))
723 				constr->defval = (AttrDefault *)
724 					repalloc(attrdef, ndef * sizeof(AttrDefault));
725 			else
726 				constr->defval = attrdef;
727 			constr->num_defval = ndef;
728 			AttrDefaultFetch(relation);
729 		}
730 		else
731 			constr->num_defval = 0;
732 
733 		constr->missing = attrmiss;
734 
735 		if (relation->rd_rel->relchecks > 0)	/* CHECKs */
736 		{
737 			constr->num_check = relation->rd_rel->relchecks;
738 			constr->check = (ConstrCheck *)
739 				MemoryContextAllocZero(CacheMemoryContext,
740 									   constr->num_check * sizeof(ConstrCheck));
741 			CheckConstraintFetch(relation);
742 		}
743 		else
744 			constr->num_check = 0;
745 	}
746 	else
747 	{
748 		pfree(constr);
749 		relation->rd_att->constr = NULL;
750 	}
751 }
752 
753 /*
754  *		RelationBuildRuleLock
755  *
756  *		Form the relation's rewrite rules from information in
757  *		the pg_rewrite system catalog.
758  *
759  * Note: The rule parsetrees are potentially very complex node structures.
760  * To allow these trees to be freed when the relcache entry is flushed,
761  * we make a private memory context to hold the RuleLock information for
762  * each relcache entry that has associated rules.  The context is used
763  * just for rule info, not for any other subsidiary data of the relcache
764  * entry, because that keeps the update logic in RelationClearRelation()
765  * manageable.  The other subsidiary data structures are simple enough
766  * to be easy to free explicitly, anyway.
767  */
768 static void
RelationBuildRuleLock(Relation relation)769 RelationBuildRuleLock(Relation relation)
770 {
771 	MemoryContext rulescxt;
772 	MemoryContext oldcxt;
773 	HeapTuple	rewrite_tuple;
774 	Relation	rewrite_desc;
775 	TupleDesc	rewrite_tupdesc;
776 	SysScanDesc rewrite_scan;
777 	ScanKeyData key;
778 	RuleLock   *rulelock;
779 	int			numlocks;
780 	RewriteRule **rules;
781 	int			maxlocks;
782 
783 	/*
784 	 * Make the private context.  Assume it'll not contain much data.
785 	 */
786 	rulescxt = AllocSetContextCreate(CacheMemoryContext,
787 									 "relation rules",
788 									 ALLOCSET_SMALL_SIZES);
789 	relation->rd_rulescxt = rulescxt;
790 	MemoryContextCopyAndSetIdentifier(rulescxt,
791 									  RelationGetRelationName(relation));
792 
793 	/*
794 	 * allocate an array to hold the rewrite rules (the array is extended if
795 	 * necessary)
796 	 */
797 	maxlocks = 4;
798 	rules = (RewriteRule **)
799 		MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
800 	numlocks = 0;
801 
802 	/*
803 	 * form a scan key
804 	 */
805 	ScanKeyInit(&key,
806 				Anum_pg_rewrite_ev_class,
807 				BTEqualStrategyNumber, F_OIDEQ,
808 				ObjectIdGetDatum(RelationGetRelid(relation)));
809 
810 	/*
811 	 * open pg_rewrite and begin a scan
812 	 *
813 	 * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
814 	 * be reading the rules in name order, except possibly during
815 	 * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
816 	 * ensures that rules will be fired in name order.
817 	 */
818 	rewrite_desc = table_open(RewriteRelationId, AccessShareLock);
819 	rewrite_tupdesc = RelationGetDescr(rewrite_desc);
820 	rewrite_scan = systable_beginscan(rewrite_desc,
821 									  RewriteRelRulenameIndexId,
822 									  true, NULL,
823 									  1, &key);
824 
825 	while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
826 	{
827 		Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
828 		bool		isnull;
829 		Datum		rule_datum;
830 		char	   *rule_str;
831 		RewriteRule *rule;
832 
833 		rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
834 												  sizeof(RewriteRule));
835 
836 		rule->ruleId = rewrite_form->oid;
837 
838 		rule->event = rewrite_form->ev_type - '0';
839 		rule->enabled = rewrite_form->ev_enabled;
840 		rule->isInstead = rewrite_form->is_instead;
841 
842 		/*
843 		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
844 		 * rule strings are often large enough to be toasted.  To avoid
845 		 * leaking memory in the caller's context, do the detoasting here so
846 		 * we can free the detoasted version.
847 		 */
848 		rule_datum = heap_getattr(rewrite_tuple,
849 								  Anum_pg_rewrite_ev_action,
850 								  rewrite_tupdesc,
851 								  &isnull);
852 		Assert(!isnull);
853 		rule_str = TextDatumGetCString(rule_datum);
854 		oldcxt = MemoryContextSwitchTo(rulescxt);
855 		rule->actions = (List *) stringToNode(rule_str);
856 		MemoryContextSwitchTo(oldcxt);
857 		pfree(rule_str);
858 
859 		rule_datum = heap_getattr(rewrite_tuple,
860 								  Anum_pg_rewrite_ev_qual,
861 								  rewrite_tupdesc,
862 								  &isnull);
863 		Assert(!isnull);
864 		rule_str = TextDatumGetCString(rule_datum);
865 		oldcxt = MemoryContextSwitchTo(rulescxt);
866 		rule->qual = (Node *) stringToNode(rule_str);
867 		MemoryContextSwitchTo(oldcxt);
868 		pfree(rule_str);
869 
870 		/*
871 		 * We want the rule's table references to be checked as though by the
872 		 * table owner, not the user referencing the rule.  Therefore, scan
873 		 * through the rule's actions and set the checkAsUser field on all
874 		 * rtable entries.  We have to look at the qual as well, in case it
875 		 * contains sublinks.
876 		 *
877 		 * The reason for doing this when the rule is loaded, rather than when
878 		 * it is stored, is that otherwise ALTER TABLE OWNER would have to
879 		 * grovel through stored rules to update checkAsUser fields. Scanning
880 		 * the rule tree during load is relatively cheap (compared to
881 		 * constructing it in the first place), so we do it here.
882 		 */
883 		setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
884 		setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
885 
886 		if (numlocks >= maxlocks)
887 		{
888 			maxlocks *= 2;
889 			rules = (RewriteRule **)
890 				repalloc(rules, sizeof(RewriteRule *) * maxlocks);
891 		}
892 		rules[numlocks++] = rule;
893 	}
894 
895 	/*
896 	 * end the scan and close the attribute relation
897 	 */
898 	systable_endscan(rewrite_scan);
899 	table_close(rewrite_desc, AccessShareLock);
900 
901 	/*
902 	 * there might not be any rules (if relhasrules is out-of-date)
903 	 */
904 	if (numlocks == 0)
905 	{
906 		relation->rd_rules = NULL;
907 		relation->rd_rulescxt = NULL;
908 		MemoryContextDelete(rulescxt);
909 		return;
910 	}
911 
912 	/*
913 	 * form a RuleLock and insert into relation
914 	 */
915 	rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
916 	rulelock->numLocks = numlocks;
917 	rulelock->rules = rules;
918 
919 	relation->rd_rules = rulelock;
920 }
921 
922 /*
923  *		equalRuleLocks
924  *
925  *		Determine whether two RuleLocks are equivalent
926  *
927  *		Probably this should be in the rules code someplace...
928  */
929 static bool
equalRuleLocks(RuleLock * rlock1,RuleLock * rlock2)930 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
931 {
932 	int			i;
933 
934 	/*
935 	 * As of 7.3 we assume the rule ordering is repeatable, because
936 	 * RelationBuildRuleLock should read 'em in a consistent order.  So just
937 	 * compare corresponding slots.
938 	 */
939 	if (rlock1 != NULL)
940 	{
941 		if (rlock2 == NULL)
942 			return false;
943 		if (rlock1->numLocks != rlock2->numLocks)
944 			return false;
945 		for (i = 0; i < rlock1->numLocks; i++)
946 		{
947 			RewriteRule *rule1 = rlock1->rules[i];
948 			RewriteRule *rule2 = rlock2->rules[i];
949 
950 			if (rule1->ruleId != rule2->ruleId)
951 				return false;
952 			if (rule1->event != rule2->event)
953 				return false;
954 			if (rule1->enabled != rule2->enabled)
955 				return false;
956 			if (rule1->isInstead != rule2->isInstead)
957 				return false;
958 			if (!equal(rule1->qual, rule2->qual))
959 				return false;
960 			if (!equal(rule1->actions, rule2->actions))
961 				return false;
962 		}
963 	}
964 	else if (rlock2 != NULL)
965 		return false;
966 	return true;
967 }
968 
969 /*
970  *		equalPolicy
971  *
972  *		Determine whether two policies are equivalent
973  */
974 static bool
equalPolicy(RowSecurityPolicy * policy1,RowSecurityPolicy * policy2)975 equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
976 {
977 	int			i;
978 	Oid		   *r1,
979 			   *r2;
980 
981 	if (policy1 != NULL)
982 	{
983 		if (policy2 == NULL)
984 			return false;
985 
986 		if (policy1->polcmd != policy2->polcmd)
987 			return false;
988 		if (policy1->hassublinks != policy2->hassublinks)
989 			return false;
990 		if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
991 			return false;
992 		if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
993 			return false;
994 
995 		r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
996 		r2 = (Oid *) ARR_DATA_PTR(policy2->roles);
997 
998 		for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
999 		{
1000 			if (r1[i] != r2[i])
1001 				return false;
1002 		}
1003 
1004 		if (!equal(policy1->qual, policy2->qual))
1005 			return false;
1006 		if (!equal(policy1->with_check_qual, policy2->with_check_qual))
1007 			return false;
1008 	}
1009 	else if (policy2 != NULL)
1010 		return false;
1011 
1012 	return true;
1013 }
1014 
1015 /*
1016  *		equalRSDesc
1017  *
1018  *		Determine whether two RowSecurityDesc's are equivalent
1019  */
1020 static bool
equalRSDesc(RowSecurityDesc * rsdesc1,RowSecurityDesc * rsdesc2)1021 equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
1022 {
1023 	ListCell   *lc,
1024 			   *rc;
1025 
1026 	if (rsdesc1 == NULL && rsdesc2 == NULL)
1027 		return true;
1028 
1029 	if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
1030 		(rsdesc1 == NULL && rsdesc2 != NULL))
1031 		return false;
1032 
1033 	if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
1034 		return false;
1035 
1036 	/* RelationBuildRowSecurity should build policies in order */
1037 	forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
1038 	{
1039 		RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
1040 		RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);
1041 
1042 		if (!equalPolicy(l, r))
1043 			return false;
1044 	}
1045 
1046 	return true;
1047 }
1048 
1049 /*
1050  *		RelationBuildDesc
1051  *
1052  *		Build a relation descriptor.  The caller must hold at least
1053  *		AccessShareLock on the target relid.
1054  *
1055  *		The new descriptor is inserted into the hash table if insertIt is true.
1056  *
1057  *		Returns NULL if no pg_class row could be found for the given relid
1058  *		(suggesting we are trying to access a just-deleted relation).
1059  *		Any other error is reported via elog.
1060  */
1061 static Relation
RelationBuildDesc(Oid targetRelId,bool insertIt)1062 RelationBuildDesc(Oid targetRelId, bool insertIt)
1063 {
1064 	int			in_progress_offset;
1065 	Relation	relation;
1066 	Oid			relid;
1067 	HeapTuple	pg_class_tuple;
1068 	Form_pg_class relp;
1069 
1070 	/*
1071 	 * This function and its subroutines can allocate a good deal of transient
1072 	 * data in CurrentMemoryContext.  Traditionally we've just leaked that
1073 	 * data, reasoning that the caller's context is at worst of transaction
1074 	 * scope, and relcache loads shouldn't happen so often that it's essential
1075 	 * to recover transient data before end of statement/transaction.  However
1076 	 * that's definitely not true in clobber-cache test builds, and perhaps
1077 	 * it's not true in other cases.  If RECOVER_RELATION_BUILD_MEMORY is not
1078 	 * zero, arrange to allocate the junk in a temporary context that we'll
1079 	 * free before returning.  Make it a child of caller's context so that it
1080 	 * will get cleaned up appropriately if we error out partway through.
1081 	 */
1082 #if RECOVER_RELATION_BUILD_MEMORY
1083 	MemoryContext tmpcxt;
1084 	MemoryContext oldcxt;
1085 
1086 	tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
1087 								   "RelationBuildDesc workspace",
1088 								   ALLOCSET_DEFAULT_SIZES);
1089 	oldcxt = MemoryContextSwitchTo(tmpcxt);
1090 #endif
1091 
1092 	/* Register to catch invalidation messages */
1093 	if (in_progress_list_len >= in_progress_list_maxlen)
1094 	{
1095 		int			allocsize;
1096 
1097 		allocsize = in_progress_list_maxlen * 2;
1098 		in_progress_list = repalloc(in_progress_list,
1099 									allocsize * sizeof(*in_progress_list));
1100 		in_progress_list_maxlen = allocsize;
1101 	}
1102 	in_progress_offset = in_progress_list_len++;
1103 	in_progress_list[in_progress_offset].reloid = targetRelId;
1104 retry:
1105 	in_progress_list[in_progress_offset].invalidated = false;
1106 
1107 	/*
1108 	 * find the tuple in pg_class corresponding to the given relation id
1109 	 */
1110 	pg_class_tuple = ScanPgRelation(targetRelId, true, false);
1111 
1112 	/*
1113 	 * if no such tuple exists, return NULL
1114 	 */
1115 	if (!HeapTupleIsValid(pg_class_tuple))
1116 	{
1117 #if RECOVER_RELATION_BUILD_MEMORY
1118 		/* Return to caller's context, and blow away the temporary context */
1119 		MemoryContextSwitchTo(oldcxt);
1120 		MemoryContextDelete(tmpcxt);
1121 #endif
1122 		Assert(in_progress_offset + 1 == in_progress_list_len);
1123 		in_progress_list_len--;
1124 		return NULL;
1125 	}
1126 
1127 	/*
1128 	 * get information from the pg_class_tuple
1129 	 */
1130 	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1131 	relid = relp->oid;
1132 	Assert(relid == targetRelId);
1133 
1134 	/*
1135 	 * allocate storage for the relation descriptor, and copy pg_class_tuple
1136 	 * to relation->rd_rel.
1137 	 */
1138 	relation = AllocateRelationDesc(relp);
1139 
1140 	/*
1141 	 * initialize the relation's relation id (relation->rd_id)
1142 	 */
1143 	RelationGetRelid(relation) = relid;
1144 
1145 	/*
1146 	 * Normal relations are not nailed into the cache.  Since we don't flush
1147 	 * new relations, it won't be new.  It could be temp though.
1148 	 */
1149 	relation->rd_refcnt = 0;
1150 	relation->rd_isnailed = false;
1151 	relation->rd_createSubid = InvalidSubTransactionId;
1152 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1153 	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
1154 	relation->rd_droppedSubid = InvalidSubTransactionId;
1155 	switch (relation->rd_rel->relpersistence)
1156 	{
1157 		case RELPERSISTENCE_UNLOGGED:
1158 		case RELPERSISTENCE_PERMANENT:
1159 			relation->rd_backend = InvalidBackendId;
1160 			relation->rd_islocaltemp = false;
1161 			break;
1162 		case RELPERSISTENCE_TEMP:
1163 			if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
1164 			{
1165 				relation->rd_backend = BackendIdForTempRelations();
1166 				relation->rd_islocaltemp = true;
1167 			}
1168 			else
1169 			{
1170 				/*
1171 				 * If it's a temp table, but not one of ours, we have to use
1172 				 * the slow, grotty method to figure out the owning backend.
1173 				 *
1174 				 * Note: it's possible that rd_backend gets set to MyBackendId
1175 				 * here, in case we are looking at a pg_class entry left over
1176 				 * from a crashed backend that coincidentally had the same
1177 				 * BackendId we're using.  We should *not* consider such a
1178 				 * table to be "ours"; this is why we need the separate
1179 				 * rd_islocaltemp flag.  The pg_class entry will get flushed
1180 				 * if/when we clean out the corresponding temp table namespace
1181 				 * in preparation for using it.
1182 				 */
1183 				relation->rd_backend =
1184 					GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
1185 				Assert(relation->rd_backend != InvalidBackendId);
1186 				relation->rd_islocaltemp = false;
1187 			}
1188 			break;
1189 		default:
1190 			elog(ERROR, "invalid relpersistence: %c",
1191 				 relation->rd_rel->relpersistence);
1192 			break;
1193 	}
1194 
1195 	/*
1196 	 * initialize the tuple descriptor (relation->rd_att).
1197 	 */
1198 	RelationBuildTupleDesc(relation);
1199 
1200 	/*
1201 	 * Fetch rules and triggers that affect this relation
1202 	 */
1203 	if (relation->rd_rel->relhasrules)
1204 		RelationBuildRuleLock(relation);
1205 	else
1206 	{
1207 		relation->rd_rules = NULL;
1208 		relation->rd_rulescxt = NULL;
1209 	}
1210 
1211 	if (relation->rd_rel->relhastriggers)
1212 		RelationBuildTriggers(relation);
1213 	else
1214 		relation->trigdesc = NULL;
1215 
1216 	if (relation->rd_rel->relrowsecurity)
1217 		RelationBuildRowSecurity(relation);
1218 	else
1219 		relation->rd_rsdesc = NULL;
1220 
1221 	/* foreign key data is not loaded till asked for */
1222 	relation->rd_fkeylist = NIL;
1223 	relation->rd_fkeyvalid = false;
1224 
1225 	/* partitioning data is not loaded till asked for */
1226 	relation->rd_partkey = NULL;
1227 	relation->rd_partkeycxt = NULL;
1228 	relation->rd_partdesc = NULL;
1229 	relation->rd_pdcxt = NULL;
1230 	relation->rd_partcheck = NIL;
1231 	relation->rd_partcheckvalid = false;
1232 	relation->rd_partcheckcxt = NULL;
1233 
1234 	/*
1235 	 * initialize access method information
1236 	 */
1237 	switch (relation->rd_rel->relkind)
1238 	{
1239 		case RELKIND_INDEX:
1240 		case RELKIND_PARTITIONED_INDEX:
1241 			Assert(relation->rd_rel->relam != InvalidOid);
1242 			RelationInitIndexAccessInfo(relation);
1243 			break;
1244 		case RELKIND_RELATION:
1245 		case RELKIND_TOASTVALUE:
1246 		case RELKIND_MATVIEW:
1247 			Assert(relation->rd_rel->relam != InvalidOid);
1248 			RelationInitTableAccessMethod(relation);
1249 			break;
1250 		case RELKIND_SEQUENCE:
1251 			Assert(relation->rd_rel->relam == InvalidOid);
1252 			RelationInitTableAccessMethod(relation);
1253 			break;
1254 		case RELKIND_VIEW:
1255 		case RELKIND_COMPOSITE_TYPE:
1256 		case RELKIND_FOREIGN_TABLE:
1257 		case RELKIND_PARTITIONED_TABLE:
1258 			Assert(relation->rd_rel->relam == InvalidOid);
1259 			break;
1260 	}
1261 
1262 	/* extract reloptions if any */
1263 	RelationParseRelOptions(relation, pg_class_tuple);
1264 
1265 	/*
1266 	 * initialize the relation lock manager information
1267 	 */
1268 	RelationInitLockInfo(relation); /* see lmgr.c */
1269 
1270 	/*
1271 	 * initialize physical addressing information for the relation
1272 	 */
1273 	RelationInitPhysicalAddr(relation);
1274 
1275 	/* make sure relation is marked as having no open file yet */
1276 	relation->rd_smgr = NULL;
1277 
1278 	/*
1279 	 * now we can free the memory allocated for pg_class_tuple
1280 	 */
1281 	heap_freetuple(pg_class_tuple);
1282 
1283 	/*
1284 	 * If an invalidation arrived mid-build, start over.  Between here and the
1285 	 * end of this function, don't add code that does or reasonably could read
1286 	 * system catalogs.  That range must be free from invalidation processing
1287 	 * for the !insertIt case.  For the insertIt case, RelationCacheInsert()
1288 	 * will enroll this relation in ordinary relcache invalidation processing,
1289 	 */
1290 	if (in_progress_list[in_progress_offset].invalidated)
1291 	{
1292 		RelationDestroyRelation(relation, false);
1293 		goto retry;
1294 	}
1295 	Assert(in_progress_offset + 1 == in_progress_list_len);
1296 	in_progress_list_len--;
1297 
1298 	/*
1299 	 * Insert newly created relation into relcache hash table, if requested.
1300 	 *
1301 	 * There is one scenario in which we might find a hashtable entry already
1302 	 * present, even though our caller failed to find it: if the relation is a
1303 	 * system catalog or index that's used during relcache load, we might have
1304 	 * recursively created the same relcache entry during the preceding steps.
1305 	 * So allow RelationCacheInsert to delete any already-present relcache
1306 	 * entry for the same OID.  The already-present entry should have refcount
1307 	 * zero (else somebody forgot to close it); in the event that it doesn't,
1308 	 * we'll elog a WARNING and leak the already-present entry.
1309 	 */
1310 	if (insertIt)
1311 		RelationCacheInsert(relation, true);
1312 
1313 	/* It's fully valid */
1314 	relation->rd_isvalid = true;
1315 
1316 #if RECOVER_RELATION_BUILD_MEMORY
1317 	/* Return to caller's context, and blow away the temporary context */
1318 	MemoryContextSwitchTo(oldcxt);
1319 	MemoryContextDelete(tmpcxt);
1320 #endif
1321 
1322 	return relation;
1323 }
1324 
1325 /*
1326  * Initialize the physical addressing info (RelFileNode) for a relcache entry
1327  *
1328  * Note: at the physical level, relations in the pg_global tablespace must
1329  * be treated as shared, even if relisshared isn't set.  Hence we do not
1330  * look at relisshared here.
1331  */
1332 static void
RelationInitPhysicalAddr(Relation relation)1333 RelationInitPhysicalAddr(Relation relation)
1334 {
1335 	Oid			oldnode = relation->rd_node.relNode;
1336 
1337 	/* these relations kinds never have storage */
1338 	if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
1339 		return;
1340 
1341 	if (relation->rd_rel->reltablespace)
1342 		relation->rd_node.spcNode = relation->rd_rel->reltablespace;
1343 	else
1344 		relation->rd_node.spcNode = MyDatabaseTableSpace;
1345 	if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
1346 		relation->rd_node.dbNode = InvalidOid;
1347 	else
1348 		relation->rd_node.dbNode = MyDatabaseId;
1349 
1350 	if (relation->rd_rel->relfilenode)
1351 	{
1352 		/*
1353 		 * Even if we are using a decoding snapshot that doesn't represent the
1354 		 * current state of the catalog we need to make sure the filenode
1355 		 * points to the current file since the older file will be gone (or
1356 		 * truncated). The new file will still contain older rows so lookups
1357 		 * in them will work correctly. This wouldn't work correctly if
1358 		 * rewrites were allowed to change the schema in an incompatible way,
1359 		 * but those are prevented both on catalog tables and on user tables
1360 		 * declared as additional catalog tables.
1361 		 */
1362 		if (HistoricSnapshotActive()
1363 			&& RelationIsAccessibleInLogicalDecoding(relation)
1364 			&& IsTransactionState())
1365 		{
1366 			HeapTuple	phys_tuple;
1367 			Form_pg_class physrel;
1368 
1369 			phys_tuple = ScanPgRelation(RelationGetRelid(relation),
1370 										RelationGetRelid(relation) != ClassOidIndexId,
1371 										true);
1372 			if (!HeapTupleIsValid(phys_tuple))
1373 				elog(ERROR, "could not find pg_class entry for %u",
1374 					 RelationGetRelid(relation));
1375 			physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
1376 
1377 			relation->rd_rel->reltablespace = physrel->reltablespace;
1378 			relation->rd_rel->relfilenode = physrel->relfilenode;
1379 			heap_freetuple(phys_tuple);
1380 		}
1381 
1382 		relation->rd_node.relNode = relation->rd_rel->relfilenode;
1383 	}
1384 	else
1385 	{
1386 		/* Consult the relation mapper */
1387 		relation->rd_node.relNode =
1388 			RelationMapOidToFilenode(relation->rd_id,
1389 									 relation->rd_rel->relisshared);
1390 		if (!OidIsValid(relation->rd_node.relNode))
1391 			elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1392 				 RelationGetRelationName(relation), relation->rd_id);
1393 	}
1394 
1395 	/*
1396 	 * For RelationNeedsWAL() to answer correctly on parallel workers, restore
1397 	 * rd_firstRelfilenodeSubid.  No subtransactions start or end while in
1398 	 * parallel mode, so the specific SubTransactionId does not matter.
1399 	 */
1400 	if (IsParallelWorker() && oldnode != relation->rd_node.relNode)
1401 	{
1402 		if (RelFileNodeSkippingWAL(relation->rd_node))
1403 			relation->rd_firstRelfilenodeSubid = TopSubTransactionId;
1404 		else
1405 			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
1406 	}
1407 }
1408 
1409 /*
1410  * Fill in the IndexAmRoutine for an index relation.
1411  *
1412  * relation's rd_amhandler and rd_indexcxt must be valid already.
1413  */
1414 static void
InitIndexAmRoutine(Relation relation)1415 InitIndexAmRoutine(Relation relation)
1416 {
1417 	IndexAmRoutine *cached,
1418 			   *tmp;
1419 
1420 	/*
1421 	 * Call the amhandler in current, short-lived memory context, just in case
1422 	 * it leaks anything (it probably won't, but let's be paranoid).
1423 	 */
1424 	tmp = GetIndexAmRoutine(relation->rd_amhandler);
1425 
1426 	/* OK, now transfer the data into relation's rd_indexcxt. */
1427 	cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
1428 												   sizeof(IndexAmRoutine));
1429 	memcpy(cached, tmp, sizeof(IndexAmRoutine));
1430 	relation->rd_indam = cached;
1431 
1432 	pfree(tmp);
1433 }
1434 
1435 /*
1436  * Initialize index-access-method support data for an index relation
1437  */
1438 void
RelationInitIndexAccessInfo(Relation relation)1439 RelationInitIndexAccessInfo(Relation relation)
1440 {
1441 	HeapTuple	tuple;
1442 	Form_pg_am	aform;
1443 	Datum		indcollDatum;
1444 	Datum		indclassDatum;
1445 	Datum		indoptionDatum;
1446 	bool		isnull;
1447 	oidvector  *indcoll;
1448 	oidvector  *indclass;
1449 	int2vector *indoption;
1450 	MemoryContext indexcxt;
1451 	MemoryContext oldcontext;
1452 	int			indnatts;
1453 	int			indnkeyatts;
1454 	uint16		amsupport;
1455 
1456 	/*
1457 	 * Make a copy of the pg_index entry for the index.  Since pg_index
1458 	 * contains variable-length and possibly-null fields, we have to do this
1459 	 * honestly rather than just treating it as a Form_pg_index struct.
1460 	 */
1461 	tuple = SearchSysCache1(INDEXRELID,
1462 							ObjectIdGetDatum(RelationGetRelid(relation)));
1463 	if (!HeapTupleIsValid(tuple))
1464 		elog(ERROR, "cache lookup failed for index %u",
1465 			 RelationGetRelid(relation));
1466 	oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
1467 	relation->rd_indextuple = heap_copytuple(tuple);
1468 	relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
1469 	MemoryContextSwitchTo(oldcontext);
1470 	ReleaseSysCache(tuple);
1471 
1472 	/*
1473 	 * Look up the index's access method, save the OID of its handler function
1474 	 */
1475 	tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
1476 	if (!HeapTupleIsValid(tuple))
1477 		elog(ERROR, "cache lookup failed for access method %u",
1478 			 relation->rd_rel->relam);
1479 	aform = (Form_pg_am) GETSTRUCT(tuple);
1480 	relation->rd_amhandler = aform->amhandler;
1481 	ReleaseSysCache(tuple);
1482 
1483 	indnatts = RelationGetNumberOfAttributes(relation);
1484 	if (indnatts != IndexRelationGetNumberOfAttributes(relation))
1485 		elog(ERROR, "relnatts disagrees with indnatts for index %u",
1486 			 RelationGetRelid(relation));
1487 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
1488 
1489 	/*
1490 	 * Make the private context to hold index access info.  The reason we need
1491 	 * a context, and not just a couple of pallocs, is so that we won't leak
1492 	 * any subsidiary info attached to fmgr lookup records.
1493 	 */
1494 	indexcxt = AllocSetContextCreate(CacheMemoryContext,
1495 									 "index info",
1496 									 ALLOCSET_SMALL_SIZES);
1497 	relation->rd_indexcxt = indexcxt;
1498 	MemoryContextCopyAndSetIdentifier(indexcxt,
1499 									  RelationGetRelationName(relation));
1500 
1501 	/*
1502 	 * Now we can fetch the index AM's API struct
1503 	 */
1504 	InitIndexAmRoutine(relation);
1505 
1506 	/*
1507 	 * Allocate arrays to hold data. Opclasses are not used for included
1508 	 * columns, so allocate them for indnkeyatts only.
1509 	 */
1510 	relation->rd_opfamily = (Oid *)
1511 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1512 	relation->rd_opcintype = (Oid *)
1513 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1514 
1515 	amsupport = relation->rd_indam->amsupport;
1516 	if (amsupport > 0)
1517 	{
1518 		int			nsupport = indnatts * amsupport;
1519 
1520 		relation->rd_support = (RegProcedure *)
1521 			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1522 		relation->rd_supportinfo = (FmgrInfo *)
1523 			MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1524 	}
1525 	else
1526 	{
1527 		relation->rd_support = NULL;
1528 		relation->rd_supportinfo = NULL;
1529 	}
1530 
1531 	relation->rd_indcollation = (Oid *)
1532 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
1533 
1534 	relation->rd_indoption = (int16 *)
1535 		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(int16));
1536 
1537 	/*
1538 	 * indcollation cannot be referenced directly through the C struct,
1539 	 * because it comes after the variable-width indkey field.  Must extract
1540 	 * the datum the hard way...
1541 	 */
1542 	indcollDatum = fastgetattr(relation->rd_indextuple,
1543 							   Anum_pg_index_indcollation,
1544 							   GetPgIndexDescriptor(),
1545 							   &isnull);
1546 	Assert(!isnull);
1547 	indcoll = (oidvector *) DatumGetPointer(indcollDatum);
1548 	memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));
1549 
1550 	/*
1551 	 * indclass cannot be referenced directly through the C struct, because it
1552 	 * comes after the variable-width indkey field.  Must extract the datum
1553 	 * the hard way...
1554 	 */
1555 	indclassDatum = fastgetattr(relation->rd_indextuple,
1556 								Anum_pg_index_indclass,
1557 								GetPgIndexDescriptor(),
1558 								&isnull);
1559 	Assert(!isnull);
1560 	indclass = (oidvector *) DatumGetPointer(indclassDatum);
1561 
1562 	/*
1563 	 * Fill the support procedure OID array, as well as the info about
1564 	 * opfamilies and opclass input types.  (aminfo and supportinfo are left
1565 	 * as zeroes, and are filled on-the-fly when used)
1566 	 */
1567 	IndexSupportInitialize(indclass, relation->rd_support,
1568 						   relation->rd_opfamily, relation->rd_opcintype,
1569 						   amsupport, indnkeyatts);
1570 
1571 	/*
1572 	 * Similarly extract indoption and copy it to the cache entry
1573 	 */
1574 	indoptionDatum = fastgetattr(relation->rd_indextuple,
1575 								 Anum_pg_index_indoption,
1576 								 GetPgIndexDescriptor(),
1577 								 &isnull);
1578 	Assert(!isnull);
1579 	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1580 	memcpy(relation->rd_indoption, indoption->values, indnkeyatts * sizeof(int16));
1581 
1582 	(void) RelationGetIndexAttOptions(relation, false);
1583 
1584 	/*
1585 	 * expressions, predicate, exclusion caches will be filled later
1586 	 */
1587 	relation->rd_indexprs = NIL;
1588 	relation->rd_indpred = NIL;
1589 	relation->rd_exclops = NULL;
1590 	relation->rd_exclprocs = NULL;
1591 	relation->rd_exclstrats = NULL;
1592 	relation->rd_amcache = NULL;
1593 }
1594 
1595 /*
1596  * IndexSupportInitialize
1597  *		Initializes an index's cached opclass information,
1598  *		given the index's pg_index.indclass entry.
1599  *
1600  * Data is returned into *indexSupport, *opFamily, and *opcInType,
1601  * which are arrays allocated by the caller.
1602  *
1603  * The caller also passes maxSupportNumber and maxAttributeNumber, since these
1604  * indicate the size of the arrays it has allocated --- but in practice these
1605  * numbers must always match those obtainable from the system catalog entries
1606  * for the index and access method.
1607  */
1608 static void
IndexSupportInitialize(oidvector * indclass,RegProcedure * indexSupport,Oid * opFamily,Oid * opcInType,StrategyNumber maxSupportNumber,AttrNumber maxAttributeNumber)1609 IndexSupportInitialize(oidvector *indclass,
1610 					   RegProcedure *indexSupport,
1611 					   Oid *opFamily,
1612 					   Oid *opcInType,
1613 					   StrategyNumber maxSupportNumber,
1614 					   AttrNumber maxAttributeNumber)
1615 {
1616 	int			attIndex;
1617 
1618 	for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1619 	{
1620 		OpClassCacheEnt *opcentry;
1621 
1622 		if (!OidIsValid(indclass->values[attIndex]))
1623 			elog(ERROR, "bogus pg_index tuple");
1624 
1625 		/* look up the info for this opclass, using a cache */
1626 		opcentry = LookupOpclassInfo(indclass->values[attIndex],
1627 									 maxSupportNumber);
1628 
1629 		/* copy cached data into relcache entry */
1630 		opFamily[attIndex] = opcentry->opcfamily;
1631 		opcInType[attIndex] = opcentry->opcintype;
1632 		if (maxSupportNumber > 0)
1633 			memcpy(&indexSupport[attIndex * maxSupportNumber],
1634 				   opcentry->supportProcs,
1635 				   maxSupportNumber * sizeof(RegProcedure));
1636 	}
1637 }
1638 
1639 /*
1640  * LookupOpclassInfo
1641  *
1642  * This routine maintains a per-opclass cache of the information needed
1643  * by IndexSupportInitialize().  This is more efficient than relying on
1644  * the catalog cache, because we can load all the info about a particular
1645  * opclass in a single indexscan of pg_amproc.
1646  *
1647  * The information from pg_am about expected range of support function
1648  * numbers is passed in, rather than being looked up, mainly because the
1649  * caller will have it already.
1650  *
1651  * Note there is no provision for flushing the cache.  This is OK at the
1652  * moment because there is no way to ALTER any interesting properties of an
1653  * existing opclass --- all you can do is drop it, which will result in
1654  * a useless but harmless dead entry in the cache.  To support altering
1655  * opclass membership (not the same as opfamily membership!), we'd need to
1656  * be able to flush this cache as well as the contents of relcache entries
1657  * for indexes.
1658  */
1659 static OpClassCacheEnt *
LookupOpclassInfo(Oid operatorClassOid,StrategyNumber numSupport)1660 LookupOpclassInfo(Oid operatorClassOid,
1661 				  StrategyNumber numSupport)
1662 {
1663 	OpClassCacheEnt *opcentry;
1664 	bool		found;
1665 	Relation	rel;
1666 	SysScanDesc scan;
1667 	ScanKeyData skey[3];
1668 	HeapTuple	htup;
1669 	bool		indexOK;
1670 
1671 	if (OpClassCache == NULL)
1672 	{
1673 		/* First time through: initialize the opclass cache */
1674 		HASHCTL		ctl;
1675 
1676 		/* Also make sure CacheMemoryContext exists */
1677 		if (!CacheMemoryContext)
1678 			CreateCacheMemoryContext();
1679 
1680 		MemSet(&ctl, 0, sizeof(ctl));
1681 		ctl.keysize = sizeof(Oid);
1682 		ctl.entrysize = sizeof(OpClassCacheEnt);
1683 		OpClassCache = hash_create("Operator class cache", 64,
1684 								   &ctl, HASH_ELEM | HASH_BLOBS);
1685 	}
1686 
1687 	opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1688 											   (void *) &operatorClassOid,
1689 											   HASH_ENTER, &found);
1690 
1691 	if (!found)
1692 	{
1693 		/* Initialize new entry */
1694 		opcentry->valid = false;	/* until known OK */
1695 		opcentry->numSupport = numSupport;
1696 		opcentry->supportProcs = NULL;	/* filled below */
1697 	}
1698 	else
1699 	{
1700 		Assert(numSupport == opcentry->numSupport);
1701 	}
1702 
1703 	/*
1704 	 * When aggressively testing cache-flush hazards, we disable the operator
1705 	 * class cache and force reloading of the info on each call.  This models
1706 	 * no real-world behavior, since the cache entries are never invalidated
1707 	 * otherwise.  However it can be helpful for detecting bugs in the cache
1708 	 * loading logic itself, such as reliance on a non-nailed index.  Given
1709 	 * the limited use-case and the fact that this adds a great deal of
1710 	 * expense, we enable it only in CLOBBER_CACHE_RECURSIVELY mode.
1711 	 */
1712 #if defined(CLOBBER_CACHE_RECURSIVELY)
1713 	opcentry->valid = false;
1714 #endif
1715 
1716 	if (opcentry->valid)
1717 		return opcentry;
1718 
1719 	/*
1720 	 * Need to fill in new entry.  First allocate space, unless we already did
1721 	 * so in some previous attempt.
1722 	 */
1723 	if (opcentry->supportProcs == NULL && numSupport > 0)
1724 		opcentry->supportProcs = (RegProcedure *)
1725 			MemoryContextAllocZero(CacheMemoryContext,
1726 								   numSupport * sizeof(RegProcedure));
1727 
1728 	/*
1729 	 * To avoid infinite recursion during startup, force heap scans if we're
1730 	 * looking up info for the opclasses used by the indexes we would like to
1731 	 * reference here.
1732 	 */
1733 	indexOK = criticalRelcachesBuilt ||
1734 		(operatorClassOid != OID_BTREE_OPS_OID &&
1735 		 operatorClassOid != INT2_BTREE_OPS_OID);
1736 
1737 	/*
1738 	 * We have to fetch the pg_opclass row to determine its opfamily and
1739 	 * opcintype, which are needed to look up related operators and functions.
1740 	 * It'd be convenient to use the syscache here, but that probably doesn't
1741 	 * work while bootstrapping.
1742 	 */
1743 	ScanKeyInit(&skey[0],
1744 				Anum_pg_opclass_oid,
1745 				BTEqualStrategyNumber, F_OIDEQ,
1746 				ObjectIdGetDatum(operatorClassOid));
1747 	rel = table_open(OperatorClassRelationId, AccessShareLock);
1748 	scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1749 							  NULL, 1, skey);
1750 
1751 	if (HeapTupleIsValid(htup = systable_getnext(scan)))
1752 	{
1753 		Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1754 
1755 		opcentry->opcfamily = opclassform->opcfamily;
1756 		opcentry->opcintype = opclassform->opcintype;
1757 	}
1758 	else
1759 		elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1760 
1761 	systable_endscan(scan);
1762 	table_close(rel, AccessShareLock);
1763 
1764 	/*
1765 	 * Scan pg_amproc to obtain support procs for the opclass.  We only fetch
1766 	 * the default ones (those with lefttype = righttype = opcintype).
1767 	 */
1768 	if (numSupport > 0)
1769 	{
1770 		ScanKeyInit(&skey[0],
1771 					Anum_pg_amproc_amprocfamily,
1772 					BTEqualStrategyNumber, F_OIDEQ,
1773 					ObjectIdGetDatum(opcentry->opcfamily));
1774 		ScanKeyInit(&skey[1],
1775 					Anum_pg_amproc_amproclefttype,
1776 					BTEqualStrategyNumber, F_OIDEQ,
1777 					ObjectIdGetDatum(opcentry->opcintype));
1778 		ScanKeyInit(&skey[2],
1779 					Anum_pg_amproc_amprocrighttype,
1780 					BTEqualStrategyNumber, F_OIDEQ,
1781 					ObjectIdGetDatum(opcentry->opcintype));
1782 		rel = table_open(AccessMethodProcedureRelationId, AccessShareLock);
1783 		scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1784 								  NULL, 3, skey);
1785 
1786 		while (HeapTupleIsValid(htup = systable_getnext(scan)))
1787 		{
1788 			Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1789 
1790 			if (amprocform->amprocnum <= 0 ||
1791 				(StrategyNumber) amprocform->amprocnum > numSupport)
1792 				elog(ERROR, "invalid amproc number %d for opclass %u",
1793 					 amprocform->amprocnum, operatorClassOid);
1794 
1795 			opcentry->supportProcs[amprocform->amprocnum - 1] =
1796 				amprocform->amproc;
1797 		}
1798 
1799 		systable_endscan(scan);
1800 		table_close(rel, AccessShareLock);
1801 	}
1802 
1803 	opcentry->valid = true;
1804 	return opcentry;
1805 }
1806 
1807 /*
1808  * Fill in the TableAmRoutine for a relation
1809  *
1810  * relation's rd_amhandler must be valid already.
1811  */
1812 static void
InitTableAmRoutine(Relation relation)1813 InitTableAmRoutine(Relation relation)
1814 {
1815 	relation->rd_tableam = GetTableAmRoutine(relation->rd_amhandler);
1816 }
1817 
1818 /*
1819  * Initialize table access method support for a table like relation
1820  */
1821 void
RelationInitTableAccessMethod(Relation relation)1822 RelationInitTableAccessMethod(Relation relation)
1823 {
1824 	HeapTuple	tuple;
1825 	Form_pg_am	aform;
1826 
1827 	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1828 	{
1829 		/*
1830 		 * Sequences are currently accessed like heap tables, but it doesn't
1831 		 * seem prudent to show that in the catalog. So just overwrite it
1832 		 * here.
1833 		 */
1834 		relation->rd_amhandler = HEAP_TABLE_AM_HANDLER_OID;
1835 	}
1836 	else if (IsCatalogRelation(relation))
1837 	{
1838 		/*
1839 		 * Avoid doing a syscache lookup for catalog tables.
1840 		 */
1841 		Assert(relation->rd_rel->relam == HEAP_TABLE_AM_OID);
1842 		relation->rd_amhandler = HEAP_TABLE_AM_HANDLER_OID;
1843 	}
1844 	else
1845 	{
1846 		/*
1847 		 * Look up the table access method, save the OID of its handler
1848 		 * function.
1849 		 */
1850 		Assert(relation->rd_rel->relam != InvalidOid);
1851 		tuple = SearchSysCache1(AMOID,
1852 								ObjectIdGetDatum(relation->rd_rel->relam));
1853 		if (!HeapTupleIsValid(tuple))
1854 			elog(ERROR, "cache lookup failed for access method %u",
1855 				 relation->rd_rel->relam);
1856 		aform = (Form_pg_am) GETSTRUCT(tuple);
1857 		relation->rd_amhandler = aform->amhandler;
1858 		ReleaseSysCache(tuple);
1859 	}
1860 
1861 	/*
1862 	 * Now we can fetch the table AM's API struct
1863 	 */
1864 	InitTableAmRoutine(relation);
1865 }
1866 
1867 /*
1868  *		formrdesc
1869  *
1870  *		This is a special cut-down version of RelationBuildDesc(),
1871  *		used while initializing the relcache.
1872  *		The relation descriptor is built just from the supplied parameters,
1873  *		without actually looking at any system table entries.  We cheat
1874  *		quite a lot since we only need to work for a few basic system
1875  *		catalogs.
1876  *
1877  * The catalogs this is used for can't have constraints (except attnotnull),
1878  * default values, rules, or triggers, since we don't cope with any of that.
1879  * (Well, actually, this only matters for properties that need to be valid
1880  * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
1881  * these properties matter then...)
1882  *
1883  * NOTE: we assume we are already switched into CacheMemoryContext.
1884  */
1885 static void
formrdesc(const char * relationName,Oid relationReltype,bool isshared,int natts,const FormData_pg_attribute * attrs)1886 formrdesc(const char *relationName, Oid relationReltype,
1887 		  bool isshared,
1888 		  int natts, const FormData_pg_attribute *attrs)
1889 {
1890 	Relation	relation;
1891 	int			i;
1892 	bool		has_not_null;
1893 
1894 	/*
1895 	 * allocate new relation desc, clear all fields of reldesc
1896 	 */
1897 	relation = (Relation) palloc0(sizeof(RelationData));
1898 
1899 	/* make sure relation is marked as having no open file yet */
1900 	relation->rd_smgr = NULL;
1901 
1902 	/*
1903 	 * initialize reference count: 1 because it is nailed in cache
1904 	 */
1905 	relation->rd_refcnt = 1;
1906 
1907 	/*
1908 	 * all entries built with this routine are nailed-in-cache; none are for
1909 	 * new or temp relations.
1910 	 */
1911 	relation->rd_isnailed = true;
1912 	relation->rd_createSubid = InvalidSubTransactionId;
1913 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1914 	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
1915 	relation->rd_droppedSubid = InvalidSubTransactionId;
1916 	relation->rd_backend = InvalidBackendId;
1917 	relation->rd_islocaltemp = false;
1918 
1919 	/*
1920 	 * initialize relation tuple form
1921 	 *
1922 	 * The data we insert here is pretty incomplete/bogus, but it'll serve to
1923 	 * get us launched.  RelationCacheInitializePhase3() will read the real
1924 	 * data from pg_class and replace what we've done here.  Note in
1925 	 * particular that relowner is left as zero; this cues
1926 	 * RelationCacheInitializePhase3 that the real data isn't there yet.
1927 	 */
1928 	relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1929 
1930 	namestrcpy(&relation->rd_rel->relname, relationName);
1931 	relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1932 	relation->rd_rel->reltype = relationReltype;
1933 
1934 	/*
1935 	 * It's important to distinguish between shared and non-shared relations,
1936 	 * even at bootstrap time, to make sure we know where they are stored.
1937 	 */
1938 	relation->rd_rel->relisshared = isshared;
1939 	if (isshared)
1940 		relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
1941 
1942 	/* formrdesc is used only for permanent relations */
1943 	relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
1944 
1945 	/* ... and they're always populated, too */
1946 	relation->rd_rel->relispopulated = true;
1947 
1948 	relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
1949 	relation->rd_rel->relpages = 0;
1950 	relation->rd_rel->reltuples = 0;
1951 	relation->rd_rel->relallvisible = 0;
1952 	relation->rd_rel->relkind = RELKIND_RELATION;
1953 	relation->rd_rel->relnatts = (int16) natts;
1954 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
1955 
1956 	/*
1957 	 * initialize attribute tuple form
1958 	 *
1959 	 * Unlike the case with the relation tuple, this data had better be right
1960 	 * because it will never be replaced.  The data comes from
1961 	 * src/include/catalog/ headers via genbki.pl.
1962 	 */
1963 	relation->rd_att = CreateTemplateTupleDesc(natts);
1964 	relation->rd_att->tdrefcount = 1;	/* mark as refcounted */
1965 
1966 	relation->rd_att->tdtypeid = relationReltype;
1967 	relation->rd_att->tdtypmod = -1;	/* unnecessary, but... */
1968 
1969 	/*
1970 	 * initialize tuple desc info
1971 	 */
1972 	has_not_null = false;
1973 	for (i = 0; i < natts; i++)
1974 	{
1975 		memcpy(TupleDescAttr(relation->rd_att, i),
1976 			   &attrs[i],
1977 			   ATTRIBUTE_FIXED_PART_SIZE);
1978 		has_not_null |= attrs[i].attnotnull;
1979 		/* make sure attcacheoff is valid */
1980 		TupleDescAttr(relation->rd_att, i)->attcacheoff = -1;
1981 	}
1982 
1983 	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1984 	TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
1985 
1986 	/* mark not-null status */
1987 	if (has_not_null)
1988 	{
1989 		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1990 
1991 		constr->has_not_null = true;
1992 		relation->rd_att->constr = constr;
1993 	}
1994 
1995 	/*
1996 	 * initialize relation id from info in att array (my, this is ugly)
1997 	 */
1998 	RelationGetRelid(relation) = TupleDescAttr(relation->rd_att, 0)->attrelid;
1999 
2000 	/*
2001 	 * All relations made with formrdesc are mapped.  This is necessarily so
2002 	 * because there is no other way to know what filenode they currently
2003 	 * have.  In bootstrap mode, add them to the initial relation mapper data,
2004 	 * specifying that the initial filenode is the same as the OID.
2005 	 */
2006 	relation->rd_rel->relfilenode = InvalidOid;
2007 	if (IsBootstrapProcessingMode())
2008 		RelationMapUpdateMap(RelationGetRelid(relation),
2009 							 RelationGetRelid(relation),
2010 							 isshared, true);
2011 
2012 	/*
2013 	 * initialize the relation lock manager information
2014 	 */
2015 	RelationInitLockInfo(relation); /* see lmgr.c */
2016 
2017 	/*
2018 	 * initialize physical addressing information for the relation
2019 	 */
2020 	RelationInitPhysicalAddr(relation);
2021 
2022 	/*
2023 	 * initialize the table am handler
2024 	 */
2025 	relation->rd_rel->relam = HEAP_TABLE_AM_OID;
2026 	relation->rd_tableam = GetHeapamTableAmRoutine();
2027 
2028 	/*
2029 	 * initialize the rel-has-index flag, using hardwired knowledge
2030 	 */
2031 	if (IsBootstrapProcessingMode())
2032 	{
2033 		/* In bootstrap mode, we have no indexes */
2034 		relation->rd_rel->relhasindex = false;
2035 	}
2036 	else
2037 	{
2038 		/* Otherwise, all the rels formrdesc is used for have indexes */
2039 		relation->rd_rel->relhasindex = true;
2040 	}
2041 
2042 	/*
2043 	 * add new reldesc to relcache
2044 	 */
2045 	RelationCacheInsert(relation, false);
2046 
2047 	/* It's fully valid */
2048 	relation->rd_isvalid = true;
2049 }
2050 
2051 
2052 /* ----------------------------------------------------------------
2053  *				 Relation Descriptor Lookup Interface
2054  * ----------------------------------------------------------------
2055  */
2056 
2057 /*
2058  *		RelationIdGetRelation
2059  *
2060  *		Lookup a reldesc by OID; make one if not already in cache.
2061  *
2062  *		Returns NULL if no pg_class row could be found for the given relid
2063  *		(suggesting we are trying to access a just-deleted relation).
2064  *		Any other error is reported via elog.
2065  *
2066  *		NB: caller should already have at least AccessShareLock on the
2067  *		relation ID, else there are nasty race conditions.
2068  *
2069  *		NB: relation ref count is incremented, or set to 1 if new entry.
2070  *		Caller should eventually decrement count.  (Usually,
2071  *		that happens by calling RelationClose().)
2072  */
2073 Relation
RelationIdGetRelation(Oid relationId)2074 RelationIdGetRelation(Oid relationId)
2075 {
2076 	Relation	rd;
2077 
2078 	/* Make sure we're in an xact, even if this ends up being a cache hit */
2079 	Assert(IsTransactionState());
2080 
2081 	/*
2082 	 * first try to find reldesc in the cache
2083 	 */
2084 	RelationIdCacheLookup(relationId, rd);
2085 
2086 	if (RelationIsValid(rd))
2087 	{
2088 		/* return NULL for dropped relations */
2089 		if (rd->rd_droppedSubid != InvalidSubTransactionId)
2090 		{
2091 			Assert(!rd->rd_isvalid);
2092 			return NULL;
2093 		}
2094 
2095 		RelationIncrementReferenceCount(rd);
2096 		/* revalidate cache entry if necessary */
2097 		if (!rd->rd_isvalid)
2098 		{
2099 			/*
2100 			 * Indexes only have a limited number of possible schema changes,
2101 			 * and we don't want to use the full-blown procedure because it's
2102 			 * a headache for indexes that reload itself depends on.
2103 			 */
2104 			if (rd->rd_rel->relkind == RELKIND_INDEX ||
2105 				rd->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
2106 				RelationReloadIndexInfo(rd);
2107 			else
2108 				RelationClearRelation(rd, true);
2109 
2110 			/*
2111 			 * Normally entries need to be valid here, but before the relcache
2112 			 * has been initialized, not enough infrastructure exists to
2113 			 * perform pg_class lookups. The structure of such entries doesn't
2114 			 * change, but we still want to update the rd_rel entry. So
2115 			 * rd_isvalid = false is left in place for a later lookup.
2116 			 */
2117 			Assert(rd->rd_isvalid ||
2118 				   (rd->rd_isnailed && !criticalRelcachesBuilt));
2119 		}
2120 		return rd;
2121 	}
2122 
2123 	/*
2124 	 * no reldesc in the cache, so have RelationBuildDesc() build one and add
2125 	 * it.
2126 	 */
2127 	rd = RelationBuildDesc(relationId, true);
2128 	if (RelationIsValid(rd))
2129 		RelationIncrementReferenceCount(rd);
2130 	return rd;
2131 }
2132 
2133 /* ----------------------------------------------------------------
2134  *				cache invalidation support routines
2135  * ----------------------------------------------------------------
2136  */
2137 
2138 /*
2139  * RelationIncrementReferenceCount
2140  *		Increments relation reference count.
2141  *
2142  * Note: bootstrap mode has its own weird ideas about relation refcount
2143  * behavior; we ought to fix it someday, but for now, just disable
2144  * reference count ownership tracking in bootstrap mode.
2145  */
2146 void
RelationIncrementReferenceCount(Relation rel)2147 RelationIncrementReferenceCount(Relation rel)
2148 {
2149 	ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
2150 	rel->rd_refcnt += 1;
2151 	if (!IsBootstrapProcessingMode())
2152 		ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
2153 }
2154 
2155 /*
2156  * RelationDecrementReferenceCount
2157  *		Decrements relation reference count.
2158  */
2159 void
RelationDecrementReferenceCount(Relation rel)2160 RelationDecrementReferenceCount(Relation rel)
2161 {
2162 	Assert(rel->rd_refcnt > 0);
2163 	rel->rd_refcnt -= 1;
2164 	if (!IsBootstrapProcessingMode())
2165 		ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
2166 }
2167 
2168 /*
2169  * RelationClose - close an open relation
2170  *
2171  *	Actually, we just decrement the refcount.
2172  *
2173  *	NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
2174  *	will be freed as soon as their refcount goes to zero.  In combination
2175  *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
2176  *	to catch references to already-released relcache entries.  It slows
2177  *	things down quite a bit, however.
2178  */
2179 void
RelationClose(Relation relation)2180 RelationClose(Relation relation)
2181 {
2182 	/* Note: no locking manipulations needed */
2183 	RelationDecrementReferenceCount(relation);
2184 
2185 	/*
2186 	 * If the relation is no longer open in this session, we can clean up any
2187 	 * stale partition descriptors it has.  This is unlikely, so check to see
2188 	 * if there are child contexts before expending a call to mcxt.c.
2189 	 */
2190 	if (RelationHasReferenceCountZero(relation) &&
2191 		relation->rd_pdcxt != NULL &&
2192 		relation->rd_pdcxt->firstchild != NULL)
2193 		MemoryContextDeleteChildren(relation->rd_pdcxt);
2194 
2195 #ifdef RELCACHE_FORCE_RELEASE
2196 	if (RelationHasReferenceCountZero(relation) &&
2197 		relation->rd_createSubid == InvalidSubTransactionId &&
2198 		relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
2199 		RelationClearRelation(relation, false);
2200 #endif
2201 }
2202 
2203 /*
2204  * RelationReloadIndexInfo - reload minimal information for an open index
2205  *
2206  *	This function is used only for indexes.  A relcache inval on an index
2207  *	can mean that its pg_class or pg_index row changed.  There are only
2208  *	very limited changes that are allowed to an existing index's schema,
2209  *	so we can update the relcache entry without a complete rebuild; which
2210  *	is fortunate because we can't rebuild an index entry that is "nailed"
2211  *	and/or in active use.  We support full replacement of the pg_class row,
2212  *	as well as updates of a few simple fields of the pg_index row.
2213  *
2214  *	We can't necessarily reread the catalog rows right away; we might be
2215  *	in a failed transaction when we receive the SI notification.  If so,
2216  *	RelationClearRelation just marks the entry as invalid by setting
2217  *	rd_isvalid to false.  This routine is called to fix the entry when it
2218  *	is next needed.
2219  *
2220  *	We assume that at the time we are called, we have at least AccessShareLock
2221  *	on the target index.  (Note: in the calls from RelationClearRelation,
2222  *	this is legitimate because we know the rel has positive refcount.)
2223  *
2224  *	If the target index is an index on pg_class or pg_index, we'd better have
2225  *	previously gotten at least AccessShareLock on its underlying catalog,
2226  *	else we are at risk of deadlock against someone trying to exclusive-lock
2227  *	the heap and index in that order.  This is ensured in current usage by
2228  *	only applying this to indexes being opened or having positive refcount.
2229  */
2230 static void
RelationReloadIndexInfo(Relation relation)2231 RelationReloadIndexInfo(Relation relation)
2232 {
2233 	bool		indexOK;
2234 	HeapTuple	pg_class_tuple;
2235 	Form_pg_class relp;
2236 
2237 	/* Should be called only for invalidated, live indexes */
2238 	Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
2239 			relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2240 		   !relation->rd_isvalid &&
2241 		   relation->rd_droppedSubid == InvalidSubTransactionId);
2242 
2243 	/* Ensure it's closed at smgr level */
2244 	RelationCloseSmgr(relation);
2245 
2246 	/* Must free any AM cached data upon relcache flush */
2247 	if (relation->rd_amcache)
2248 		pfree(relation->rd_amcache);
2249 	relation->rd_amcache = NULL;
2250 
2251 	/*
2252 	 * If it's a shared index, we might be called before backend startup has
2253 	 * finished selecting a database, in which case we have no way to read
2254 	 * pg_class yet.  However, a shared index can never have any significant
2255 	 * schema updates, so it's okay to ignore the invalidation signal.  Just
2256 	 * mark it valid and return without doing anything more.
2257 	 */
2258 	if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
2259 	{
2260 		relation->rd_isvalid = true;
2261 		return;
2262 	}
2263 
2264 	/*
2265 	 * Read the pg_class row
2266 	 *
2267 	 * Don't try to use an indexscan of pg_class_oid_index to reload the info
2268 	 * for pg_class_oid_index ...
2269 	 */
2270 	indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
2271 	pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
2272 	if (!HeapTupleIsValid(pg_class_tuple))
2273 		elog(ERROR, "could not find pg_class tuple for index %u",
2274 			 RelationGetRelid(relation));
2275 	relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2276 	memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2277 	/* Reload reloptions in case they changed */
2278 	if (relation->rd_options)
2279 		pfree(relation->rd_options);
2280 	RelationParseRelOptions(relation, pg_class_tuple);
2281 	/* done with pg_class tuple */
2282 	heap_freetuple(pg_class_tuple);
2283 	/* We must recalculate physical address in case it changed */
2284 	RelationInitPhysicalAddr(relation);
2285 
2286 	/*
2287 	 * For a non-system index, there are fields of the pg_index row that are
2288 	 * allowed to change, so re-read that row and update the relcache entry.
2289 	 * Most of the info derived from pg_index (such as support function lookup
2290 	 * info) cannot change, and indeed the whole point of this routine is to
2291 	 * update the relcache entry without clobbering that data; so wholesale
2292 	 * replacement is not appropriate.
2293 	 */
2294 	if (!IsSystemRelation(relation))
2295 	{
2296 		HeapTuple	tuple;
2297 		Form_pg_index index;
2298 
2299 		tuple = SearchSysCache1(INDEXRELID,
2300 								ObjectIdGetDatum(RelationGetRelid(relation)));
2301 		if (!HeapTupleIsValid(tuple))
2302 			elog(ERROR, "cache lookup failed for index %u",
2303 				 RelationGetRelid(relation));
2304 		index = (Form_pg_index) GETSTRUCT(tuple);
2305 
2306 		/*
2307 		 * Basically, let's just copy all the bool fields.  There are one or
2308 		 * two of these that can't actually change in the current code, but
2309 		 * it's not worth it to track exactly which ones they are.  None of
2310 		 * the array fields are allowed to change, though.
2311 		 */
2312 		relation->rd_index->indisunique = index->indisunique;
2313 		relation->rd_index->indisprimary = index->indisprimary;
2314 		relation->rd_index->indisexclusion = index->indisexclusion;
2315 		relation->rd_index->indimmediate = index->indimmediate;
2316 		relation->rd_index->indisclustered = index->indisclustered;
2317 		relation->rd_index->indisvalid = index->indisvalid;
2318 		relation->rd_index->indcheckxmin = index->indcheckxmin;
2319 		relation->rd_index->indisready = index->indisready;
2320 		relation->rd_index->indislive = index->indislive;
2321 
2322 		/* Copy xmin too, as that is needed to make sense of indcheckxmin */
2323 		HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
2324 							   HeapTupleHeaderGetXmin(tuple->t_data));
2325 
2326 		ReleaseSysCache(tuple);
2327 	}
2328 
2329 	/* Okay, now it's valid again */
2330 	relation->rd_isvalid = true;
2331 }
2332 
2333 /*
2334  * RelationReloadNailed - reload minimal information for nailed relations.
2335  *
2336  * The structure of a nailed relation can never change (which is good, because
2337  * we rely on knowing their structure to be able to read catalog content). But
2338  * some parts, e.g. pg_class.relfrozenxid, are still important to have
2339  * accurate content for. Therefore those need to be reloaded after the arrival
2340  * of invalidations.
2341  */
2342 static void
RelationReloadNailed(Relation relation)2343 RelationReloadNailed(Relation relation)
2344 {
2345 	Assert(relation->rd_isnailed);
2346 
2347 	/*
2348 	 * Redo RelationInitPhysicalAddr in case it is a mapped relation whose
2349 	 * mapping changed.
2350 	 */
2351 	RelationInitPhysicalAddr(relation);
2352 
2353 	/* flag as needing to be revalidated */
2354 	relation->rd_isvalid = false;
2355 
2356 	/*
2357 	 * Can only reread catalog contents if in a transaction.  If the relation
2358 	 * is currently open (not counting the nailed refcount), do so
2359 	 * immediately. Otherwise we've already marked the entry as possibly
2360 	 * invalid, and it'll be fixed when next opened.
2361 	 */
2362 	if (!IsTransactionState() || relation->rd_refcnt <= 1)
2363 		return;
2364 
2365 	if (relation->rd_rel->relkind == RELKIND_INDEX)
2366 	{
2367 		/*
2368 		 * If it's a nailed-but-not-mapped index, then we need to re-read the
2369 		 * pg_class row to see if its relfilenode changed.
2370 		 */
2371 		RelationReloadIndexInfo(relation);
2372 	}
2373 	else
2374 	{
2375 		/*
2376 		 * Reload a non-index entry.  We can't easily do so if relcaches
2377 		 * aren't yet built, but that's fine because at that stage the
2378 		 * attributes that need to be current (like relfrozenxid) aren't yet
2379 		 * accessed.  To ensure the entry will later be revalidated, we leave
2380 		 * it in invalid state, but allow use (cf. RelationIdGetRelation()).
2381 		 */
2382 		if (criticalRelcachesBuilt)
2383 		{
2384 			HeapTuple	pg_class_tuple;
2385 			Form_pg_class relp;
2386 
2387 			/*
2388 			 * NB: Mark the entry as valid before starting to scan, to avoid
2389 			 * self-recursion when re-building pg_class.
2390 			 */
2391 			relation->rd_isvalid = true;
2392 
2393 			pg_class_tuple = ScanPgRelation(RelationGetRelid(relation),
2394 											true, false);
2395 			relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
2396 			memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
2397 			heap_freetuple(pg_class_tuple);
2398 
2399 			/*
2400 			 * Again mark as valid, to protect against concurrently arriving
2401 			 * invalidations.
2402 			 */
2403 			relation->rd_isvalid = true;
2404 		}
2405 	}
2406 }
2407 
2408 /*
2409  * RelationDestroyRelation
2410  *
2411  *	Physically delete a relation cache entry and all subsidiary data.
2412  *	Caller must already have unhooked the entry from the hash table.
2413  */
2414 static void
RelationDestroyRelation(Relation relation,bool remember_tupdesc)2415 RelationDestroyRelation(Relation relation, bool remember_tupdesc)
2416 {
2417 	Assert(RelationHasReferenceCountZero(relation));
2418 
2419 	/*
2420 	 * Make sure smgr and lower levels close the relation's files, if they
2421 	 * weren't closed already.  (This was probably done by caller, but let's
2422 	 * just be real sure.)
2423 	 */
2424 	RelationCloseSmgr(relation);
2425 
2426 	/*
2427 	 * Free all the subsidiary data structures of the relcache entry, then the
2428 	 * entry itself.
2429 	 */
2430 	if (relation->rd_rel)
2431 		pfree(relation->rd_rel);
2432 	/* can't use DecrTupleDescRefCount here */
2433 	Assert(relation->rd_att->tdrefcount > 0);
2434 	if (--relation->rd_att->tdrefcount == 0)
2435 	{
2436 		/*
2437 		 * If we Rebuilt a relcache entry during a transaction then its
2438 		 * possible we did that because the TupDesc changed as the result of
2439 		 * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
2440 		 * possible someone copied that TupDesc, in which case the copy would
2441 		 * point to free'd memory. So if we rebuild an entry we keep the
2442 		 * TupDesc around until end of transaction, to be safe.
2443 		 */
2444 		if (remember_tupdesc)
2445 			RememberToFreeTupleDescAtEOX(relation->rd_att);
2446 		else
2447 			FreeTupleDesc(relation->rd_att);
2448 	}
2449 	FreeTriggerDesc(relation->trigdesc);
2450 	list_free_deep(relation->rd_fkeylist);
2451 	list_free(relation->rd_indexlist);
2452 	list_free(relation->rd_statlist);
2453 	bms_free(relation->rd_indexattr);
2454 	bms_free(relation->rd_keyattr);
2455 	bms_free(relation->rd_pkattr);
2456 	bms_free(relation->rd_idattr);
2457 	if (relation->rd_pubactions)
2458 		pfree(relation->rd_pubactions);
2459 	if (relation->rd_options)
2460 		pfree(relation->rd_options);
2461 	if (relation->rd_indextuple)
2462 		pfree(relation->rd_indextuple);
2463 	if (relation->rd_amcache)
2464 		pfree(relation->rd_amcache);
2465 	if (relation->rd_fdwroutine)
2466 		pfree(relation->rd_fdwroutine);
2467 	if (relation->rd_indexcxt)
2468 		MemoryContextDelete(relation->rd_indexcxt);
2469 	if (relation->rd_rulescxt)
2470 		MemoryContextDelete(relation->rd_rulescxt);
2471 	if (relation->rd_rsdesc)
2472 		MemoryContextDelete(relation->rd_rsdesc->rscxt);
2473 	if (relation->rd_partkeycxt)
2474 		MemoryContextDelete(relation->rd_partkeycxt);
2475 	if (relation->rd_pdcxt)
2476 		MemoryContextDelete(relation->rd_pdcxt);
2477 	if (relation->rd_partcheckcxt)
2478 		MemoryContextDelete(relation->rd_partcheckcxt);
2479 	pfree(relation);
2480 }
2481 
2482 /*
2483  * RelationClearRelation
2484  *
2485  *	 Physically blow away a relation cache entry, or reset it and rebuild
2486  *	 it from scratch (that is, from catalog entries).  The latter path is
2487  *	 used when we are notified of a change to an open relation (one with
2488  *	 refcount > 0).
2489  *
2490  *	 NB: when rebuilding, we'd better hold some lock on the relation,
2491  *	 else the catalog data we need to read could be changing under us.
2492  *	 Also, a rel to be rebuilt had better have refcnt > 0.  This is because
2493  *	 a sinval reset could happen while we're accessing the catalogs, and
2494  *	 the rel would get blown away underneath us by RelationCacheInvalidate
2495  *	 if it has zero refcnt.
2496  *
2497  *	 The "rebuild" parameter is redundant in current usage because it has
2498  *	 to match the relation's refcnt status, but we keep it as a crosscheck
2499  *	 that we're doing what the caller expects.
2500  */
2501 static void
RelationClearRelation(Relation relation,bool rebuild)2502 RelationClearRelation(Relation relation, bool rebuild)
2503 {
2504 	/*
2505 	 * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
2506 	 * course it would be an equally bad idea to blow away one with nonzero
2507 	 * refcnt, since that would leave someone somewhere with a dangling
2508 	 * pointer.  All callers are expected to have verified that this holds.
2509 	 */
2510 	Assert(rebuild ?
2511 		   !RelationHasReferenceCountZero(relation) :
2512 		   RelationHasReferenceCountZero(relation));
2513 
2514 	/*
2515 	 * Make sure smgr and lower levels close the relation's files, if they
2516 	 * weren't closed already.  If the relation is not getting deleted, the
2517 	 * next smgr access should reopen the files automatically.  This ensures
2518 	 * that the low-level file access state is updated after, say, a vacuum
2519 	 * truncation.
2520 	 */
2521 	RelationCloseSmgr(relation);
2522 
2523 	/* Free AM cached data, if any */
2524 	if (relation->rd_amcache)
2525 		pfree(relation->rd_amcache);
2526 	relation->rd_amcache = NULL;
2527 
2528 	/*
2529 	 * Treat nailed-in system relations separately, they always need to be
2530 	 * accessible, so we can't blow them away.
2531 	 */
2532 	if (relation->rd_isnailed)
2533 	{
2534 		RelationReloadNailed(relation);
2535 		return;
2536 	}
2537 
2538 	/* Mark it invalid until we've finished rebuild */
2539 	relation->rd_isvalid = false;
2540 
2541 	/* See RelationForgetRelation(). */
2542 	if (relation->rd_droppedSubid != InvalidSubTransactionId)
2543 		return;
2544 
2545 	/*
2546 	 * Even non-system indexes should not be blown away if they are open and
2547 	 * have valid index support information.  This avoids problems with active
2548 	 * use of the index support information.  As with nailed indexes, we
2549 	 * re-read the pg_class row to handle possible physical relocation of the
2550 	 * index, and we check for pg_index updates too.
2551 	 */
2552 	if ((relation->rd_rel->relkind == RELKIND_INDEX ||
2553 		 relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
2554 		relation->rd_refcnt > 0 &&
2555 		relation->rd_indexcxt != NULL)
2556 	{
2557 		if (IsTransactionState())
2558 			RelationReloadIndexInfo(relation);
2559 		return;
2560 	}
2561 
2562 	/*
2563 	 * If we're really done with the relcache entry, blow it away. But if
2564 	 * someone is still using it, reconstruct the whole deal without moving
2565 	 * the physical RelationData record (so that the someone's pointer is
2566 	 * still valid).
2567 	 */
2568 	if (!rebuild)
2569 	{
2570 		/* Remove it from the hash table */
2571 		RelationCacheDelete(relation);
2572 
2573 		/* And release storage */
2574 		RelationDestroyRelation(relation, false);
2575 	}
2576 	else if (!IsTransactionState())
2577 	{
2578 		/*
2579 		 * If we're not inside a valid transaction, we can't do any catalog
2580 		 * access so it's not possible to rebuild yet.  Just exit, leaving
2581 		 * rd_isvalid = false so that the rebuild will occur when the entry is
2582 		 * next opened.
2583 		 *
2584 		 * Note: it's possible that we come here during subtransaction abort,
2585 		 * and the reason for wanting to rebuild is that the rel is open in
2586 		 * the outer transaction.  In that case it might seem unsafe to not
2587 		 * rebuild immediately, since whatever code has the rel already open
2588 		 * will keep on using the relcache entry as-is.  However, in such a
2589 		 * case the outer transaction should be holding a lock that's
2590 		 * sufficient to prevent any significant change in the rel's schema,
2591 		 * so the existing entry contents should be good enough for its
2592 		 * purposes; at worst we might be behind on statistics updates or the
2593 		 * like.  (See also CheckTableNotInUse() and its callers.)	These same
2594 		 * remarks also apply to the cases above where we exit without having
2595 		 * done RelationReloadIndexInfo() yet.
2596 		 */
2597 		return;
2598 	}
2599 	else
2600 	{
2601 		/*
2602 		 * Our strategy for rebuilding an open relcache entry is to build a
2603 		 * new entry from scratch, swap its contents with the old entry, and
2604 		 * finally delete the new entry (along with any infrastructure swapped
2605 		 * over from the old entry).  This is to avoid trouble in case an
2606 		 * error causes us to lose control partway through.  The old entry
2607 		 * will still be marked !rd_isvalid, so we'll try to rebuild it again
2608 		 * on next access.  Meanwhile it's not any less valid than it was
2609 		 * before, so any code that might expect to continue accessing it
2610 		 * isn't hurt by the rebuild failure.  (Consider for example a
2611 		 * subtransaction that ALTERs a table and then gets canceled partway
2612 		 * through the cache entry rebuild.  The outer transaction should
2613 		 * still see the not-modified cache entry as valid.)  The worst
2614 		 * consequence of an error is leaking the necessarily-unreferenced new
2615 		 * entry, and this shouldn't happen often enough for that to be a big
2616 		 * problem.
2617 		 *
2618 		 * When rebuilding an open relcache entry, we must preserve ref count,
2619 		 * rd_*Subid, and rd_toastoid state.  Also attempt to preserve the
2620 		 * pg_class entry (rd_rel), tupledesc, rewrite-rule, partition key,
2621 		 * and partition descriptor substructures in place, because various
2622 		 * places assume that these structures won't move while they are
2623 		 * working with an open relcache entry.  (Note:  the refcount
2624 		 * mechanism for tupledescs might someday allow us to remove this hack
2625 		 * for the tupledesc.)
2626 		 *
2627 		 * Note that this process does not touch CurrentResourceOwner; which
2628 		 * is good because whatever ref counts the entry may have do not
2629 		 * necessarily belong to that resource owner.
2630 		 */
2631 		Relation	newrel;
2632 		Oid			save_relid = RelationGetRelid(relation);
2633 		bool		keep_tupdesc;
2634 		bool		keep_rules;
2635 		bool		keep_policies;
2636 		bool		keep_partkey;
2637 
2638 		/* Build temporary entry, but don't link it into hashtable */
2639 		newrel = RelationBuildDesc(save_relid, false);
2640 
2641 		/*
2642 		 * Between here and the end of the swap, don't add code that does or
2643 		 * reasonably could read system catalogs.  That range must be free
2644 		 * from invalidation processing.  See RelationBuildDesc() manipulation
2645 		 * of in_progress_list.
2646 		 */
2647 
2648 		if (newrel == NULL)
2649 		{
2650 			/*
2651 			 * We can validly get here, if we're using a historic snapshot in
2652 			 * which a relation, accessed from outside logical decoding, is
2653 			 * still invisible. In that case it's fine to just mark the
2654 			 * relation as invalid and return - it'll fully get reloaded by
2655 			 * the cache reset at the end of logical decoding (or at the next
2656 			 * access).  During normal processing we don't want to ignore this
2657 			 * case as it shouldn't happen there, as explained below.
2658 			 */
2659 			if (HistoricSnapshotActive())
2660 				return;
2661 
2662 			/*
2663 			 * This shouldn't happen as dropping a relation is intended to be
2664 			 * impossible if still referenced (cf. CheckTableNotInUse()). But
2665 			 * if we get here anyway, we can't just delete the relcache entry,
2666 			 * as it possibly could get accessed later (as e.g. the error
2667 			 * might get trapped and handled via a subtransaction rollback).
2668 			 */
2669 			elog(ERROR, "relation %u deleted while still in use", save_relid);
2670 		}
2671 
2672 		keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
2673 		keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
2674 		keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
2675 		/* partkey is immutable once set up, so we can always keep it */
2676 		keep_partkey = (relation->rd_partkey != NULL);
2677 
2678 		/*
2679 		 * Perform swapping of the relcache entry contents.  Within this
2680 		 * process the old entry is momentarily invalid, so there *must* be no
2681 		 * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
2682 		 * all-in-line code for safety.
2683 		 *
2684 		 * Since the vast majority of fields should be swapped, our method is
2685 		 * to swap the whole structures and then re-swap those few fields we
2686 		 * didn't want swapped.
2687 		 */
2688 #define SWAPFIELD(fldtype, fldname) \
2689 		do { \
2690 			fldtype _tmp = newrel->fldname; \
2691 			newrel->fldname = relation->fldname; \
2692 			relation->fldname = _tmp; \
2693 		} while (0)
2694 
2695 		/* swap all Relation struct fields */
2696 		{
2697 			RelationData tmpstruct;
2698 
2699 			memcpy(&tmpstruct, newrel, sizeof(RelationData));
2700 			memcpy(newrel, relation, sizeof(RelationData));
2701 			memcpy(relation, &tmpstruct, sizeof(RelationData));
2702 		}
2703 
2704 		/* rd_smgr must not be swapped, due to back-links from smgr level */
2705 		SWAPFIELD(SMgrRelation, rd_smgr);
2706 		/* rd_refcnt must be preserved */
2707 		SWAPFIELD(int, rd_refcnt);
2708 		/* isnailed shouldn't change */
2709 		Assert(newrel->rd_isnailed == relation->rd_isnailed);
2710 		/* creation sub-XIDs must be preserved */
2711 		SWAPFIELD(SubTransactionId, rd_createSubid);
2712 		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
2713 		SWAPFIELD(SubTransactionId, rd_firstRelfilenodeSubid);
2714 		SWAPFIELD(SubTransactionId, rd_droppedSubid);
2715 		/* un-swap rd_rel pointers, swap contents instead */
2716 		SWAPFIELD(Form_pg_class, rd_rel);
2717 		/* ... but actually, we don't have to update newrel->rd_rel */
2718 		memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
2719 		/* preserve old tupledesc, rules, policies if no logical change */
2720 		if (keep_tupdesc)
2721 			SWAPFIELD(TupleDesc, rd_att);
2722 		if (keep_rules)
2723 		{
2724 			SWAPFIELD(RuleLock *, rd_rules);
2725 			SWAPFIELD(MemoryContext, rd_rulescxt);
2726 		}
2727 		if (keep_policies)
2728 			SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
2729 		/* toast OID override must be preserved */
2730 		SWAPFIELD(Oid, rd_toastoid);
2731 		/* pgstat_info must be preserved */
2732 		SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
2733 		/* preserve old partition key if we have one */
2734 		if (keep_partkey)
2735 		{
2736 			SWAPFIELD(PartitionKey, rd_partkey);
2737 			SWAPFIELD(MemoryContext, rd_partkeycxt);
2738 		}
2739 		if (newrel->rd_pdcxt != NULL)
2740 		{
2741 			/*
2742 			 * We are rebuilding a partitioned relation with a non-zero
2743 			 * reference count, so we must keep the old partition descriptor
2744 			 * around, in case there's a PartitionDirectory with a pointer to
2745 			 * it.  This means we can't free the old rd_pdcxt yet.  (This is
2746 			 * necessary because RelationGetPartitionDesc hands out direct
2747 			 * pointers to the relcache's data structure, unlike our usual
2748 			 * practice which is to hand out copies.  We'd have the same
2749 			 * problem with rd_partkey, except that we always preserve that
2750 			 * once created.)
2751 			 *
2752 			 * To ensure that it's not leaked completely, re-attach it to the
2753 			 * new reldesc, or make it a child of the new reldesc's rd_pdcxt
2754 			 * in the unlikely event that there is one already.  (Compare hack
2755 			 * in RelationBuildPartitionDesc.)  RelationClose will clean up
2756 			 * any such contexts once the reference count reaches zero.
2757 			 *
2758 			 * In the case where the reference count is zero, this code is not
2759 			 * reached, which should be OK because in that case there should
2760 			 * be no PartitionDirectory with a pointer to the old entry.
2761 			 *
2762 			 * Note that newrel and relation have already been swapped, so the
2763 			 * "old" partition descriptor is actually the one hanging off of
2764 			 * newrel.
2765 			 */
2766 			relation->rd_partdesc = NULL;	/* ensure rd_partdesc is invalid */
2767 			if (relation->rd_pdcxt != NULL) /* probably never happens */
2768 				MemoryContextSetParent(newrel->rd_pdcxt, relation->rd_pdcxt);
2769 			else
2770 				relation->rd_pdcxt = newrel->rd_pdcxt;
2771 			/* drop newrel's pointers so we don't destroy it below */
2772 			newrel->rd_partdesc = NULL;
2773 			newrel->rd_pdcxt = NULL;
2774 		}
2775 
2776 #undef SWAPFIELD
2777 
2778 		/* And now we can throw away the temporary entry */
2779 		RelationDestroyRelation(newrel, !keep_tupdesc);
2780 	}
2781 }
2782 
2783 /*
2784  * RelationFlushRelation
2785  *
2786  *	 Rebuild the relation if it is open (refcount > 0), else blow it away.
2787  *	 This is used when we receive a cache invalidation event for the rel.
2788  */
2789 static void
RelationFlushRelation(Relation relation)2790 RelationFlushRelation(Relation relation)
2791 {
2792 	if (relation->rd_createSubid != InvalidSubTransactionId ||
2793 		relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2794 	{
2795 		/*
2796 		 * New relcache entries are always rebuilt, not flushed; else we'd
2797 		 * forget the "new" status of the relation.  Ditto for the
2798 		 * new-relfilenode status.
2799 		 *
2800 		 * The rel could have zero refcnt here, so temporarily increment the
2801 		 * refcnt to ensure it's safe to rebuild it.  We can assume that the
2802 		 * current transaction has some lock on the rel already.
2803 		 */
2804 		RelationIncrementReferenceCount(relation);
2805 		RelationClearRelation(relation, true);
2806 		RelationDecrementReferenceCount(relation);
2807 	}
2808 	else
2809 	{
2810 		/*
2811 		 * Pre-existing rels can be dropped from the relcache if not open.
2812 		 */
2813 		bool		rebuild = !RelationHasReferenceCountZero(relation);
2814 
2815 		RelationClearRelation(relation, rebuild);
2816 	}
2817 }
2818 
2819 /*
2820  * RelationForgetRelation - caller reports that it dropped the relation
2821  */
2822 void
RelationForgetRelation(Oid rid)2823 RelationForgetRelation(Oid rid)
2824 {
2825 	Relation	relation;
2826 
2827 	RelationIdCacheLookup(rid, relation);
2828 
2829 	if (!PointerIsValid(relation))
2830 		return;					/* not in cache, nothing to do */
2831 
2832 	if (!RelationHasReferenceCountZero(relation))
2833 		elog(ERROR, "relation %u is still open", rid);
2834 
2835 	Assert(relation->rd_droppedSubid == InvalidSubTransactionId);
2836 	if (relation->rd_createSubid != InvalidSubTransactionId ||
2837 		relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2838 	{
2839 		/*
2840 		 * In the event of subtransaction rollback, we must not forget
2841 		 * rd_*Subid.  Mark the entry "dropped" so RelationClearRelation()
2842 		 * invalidates it in lieu of destroying it.  (If we're in a top
2843 		 * transaction, we could opt to destroy the entry.)
2844 		 */
2845 		relation->rd_droppedSubid = GetCurrentSubTransactionId();
2846 	}
2847 
2848 	RelationClearRelation(relation, false);
2849 }
2850 
2851 /*
2852  *		RelationCacheInvalidateEntry
2853  *
2854  *		This routine is invoked for SI cache flush messages.
2855  *
2856  * Any relcache entry matching the relid must be flushed.  (Note: caller has
2857  * already determined that the relid belongs to our database or is a shared
2858  * relation.)
2859  *
2860  * We used to skip local relations, on the grounds that they could
2861  * not be targets of cross-backend SI update messages; but it seems
2862  * safer to process them, so that our *own* SI update messages will
2863  * have the same effects during CommandCounterIncrement for both
2864  * local and nonlocal relations.
2865  */
2866 void
RelationCacheInvalidateEntry(Oid relationId)2867 RelationCacheInvalidateEntry(Oid relationId)
2868 {
2869 	Relation	relation;
2870 
2871 	RelationIdCacheLookup(relationId, relation);
2872 
2873 	if (PointerIsValid(relation))
2874 	{
2875 		relcacheInvalsReceived++;
2876 		RelationFlushRelation(relation);
2877 	}
2878 	else
2879 	{
2880 		int			i;
2881 
2882 		for (i = 0; i < in_progress_list_len; i++)
2883 			if (in_progress_list[i].reloid == relationId)
2884 				in_progress_list[i].invalidated = true;
2885 	}
2886 }
2887 
2888 /*
2889  * RelationCacheInvalidate
2890  *	 Blow away cached relation descriptors that have zero reference counts,
2891  *	 and rebuild those with positive reference counts.  Also reset the smgr
2892  *	 relation cache and re-read relation mapping data.
2893  *
2894  *	 Apart from debug_discard_caches, this is currently used only to recover
2895  *	 from SI message buffer overflow, so we do not touch relations having
2896  *	 new-in-transaction relfilenodes; they cannot be targets of cross-backend
2897  *	 SI updates (and our own updates now go through a separate linked list
2898  *	 that isn't limited by the SI message buffer size).
2899  *
2900  *	 We do this in two phases: the first pass deletes deletable items, and
2901  *	 the second one rebuilds the rebuildable items.  This is essential for
2902  *	 safety, because hash_seq_search only copes with concurrent deletion of
2903  *	 the element it is currently visiting.  If a second SI overflow were to
2904  *	 occur while we are walking the table, resulting in recursive entry to
2905  *	 this routine, we could crash because the inner invocation blows away
2906  *	 the entry next to be visited by the outer scan.  But this way is OK,
2907  *	 because (a) during the first pass we won't process any more SI messages,
2908  *	 so hash_seq_search will complete safely; (b) during the second pass we
2909  *	 only hold onto pointers to nondeletable entries.
2910  *
2911  *	 The two-phase approach also makes it easy to update relfilenodes for
2912  *	 mapped relations before we do anything else, and to ensure that the
2913  *	 second pass processes nailed-in-cache items before other nondeletable
2914  *	 items.  This should ensure that system catalogs are up to date before
2915  *	 we attempt to use them to reload information about other open relations.
2916  *
2917  *	 After those two phases of work having immediate effects, we normally
2918  *	 signal any RelationBuildDesc() on the stack to start over.  However, we
2919  *	 don't do this if called as part of debug_discard_caches.  Otherwise,
2920  *	 RelationBuildDesc() would become an infinite loop.
2921  */
2922 void
RelationCacheInvalidate(bool debug_discard)2923 RelationCacheInvalidate(bool debug_discard)
2924 {
2925 	HASH_SEQ_STATUS status;
2926 	RelIdCacheEnt *idhentry;
2927 	Relation	relation;
2928 	List	   *rebuildFirstList = NIL;
2929 	List	   *rebuildList = NIL;
2930 	ListCell   *l;
2931 	int			i;
2932 
2933 	/*
2934 	 * Reload relation mapping data before starting to reconstruct cache.
2935 	 */
2936 	RelationMapInvalidateAll();
2937 
2938 	/* Phase 1 */
2939 	hash_seq_init(&status, RelationIdCache);
2940 
2941 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2942 	{
2943 		relation = idhentry->reldesc;
2944 
2945 		/* Must close all smgr references to avoid leaving dangling ptrs */
2946 		RelationCloseSmgr(relation);
2947 
2948 		/*
2949 		 * Ignore new relations; no other backend will manipulate them before
2950 		 * we commit.  Likewise, before replacing a relation's relfilenode, we
2951 		 * shall have acquired AccessExclusiveLock and drained any applicable
2952 		 * pending invalidations.
2953 		 */
2954 		if (relation->rd_createSubid != InvalidSubTransactionId ||
2955 			relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
2956 			continue;
2957 
2958 		relcacheInvalsReceived++;
2959 
2960 		if (RelationHasReferenceCountZero(relation))
2961 		{
2962 			/* Delete this entry immediately */
2963 			Assert(!relation->rd_isnailed);
2964 			RelationClearRelation(relation, false);
2965 		}
2966 		else
2967 		{
2968 			/*
2969 			 * If it's a mapped relation, immediately update its rd_node in
2970 			 * case its relfilenode changed.  We must do this during phase 1
2971 			 * in case the relation is consulted during rebuild of other
2972 			 * relcache entries in phase 2.  It's safe since consulting the
2973 			 * map doesn't involve any access to relcache entries.
2974 			 */
2975 			if (RelationIsMapped(relation))
2976 				RelationInitPhysicalAddr(relation);
2977 
2978 			/*
2979 			 * Add this entry to list of stuff to rebuild in second pass.
2980 			 * pg_class goes to the front of rebuildFirstList while
2981 			 * pg_class_oid_index goes to the back of rebuildFirstList, so
2982 			 * they are done first and second respectively.  Other nailed
2983 			 * relations go to the front of rebuildList, so they'll be done
2984 			 * next in no particular order; and everything else goes to the
2985 			 * back of rebuildList.
2986 			 */
2987 			if (RelationGetRelid(relation) == RelationRelationId)
2988 				rebuildFirstList = lcons(relation, rebuildFirstList);
2989 			else if (RelationGetRelid(relation) == ClassOidIndexId)
2990 				rebuildFirstList = lappend(rebuildFirstList, relation);
2991 			else if (relation->rd_isnailed)
2992 				rebuildList = lcons(relation, rebuildList);
2993 			else
2994 				rebuildList = lappend(rebuildList, relation);
2995 		}
2996 	}
2997 
2998 	/*
2999 	 * Now zap any remaining smgr cache entries.  This must happen before we
3000 	 * start to rebuild entries, since that may involve catalog fetches which
3001 	 * will re-open catalog files.
3002 	 */
3003 	smgrcloseall();
3004 
3005 	/* Phase 2: rebuild the items found to need rebuild in phase 1 */
3006 	foreach(l, rebuildFirstList)
3007 	{
3008 		relation = (Relation) lfirst(l);
3009 		RelationClearRelation(relation, true);
3010 	}
3011 	list_free(rebuildFirstList);
3012 	foreach(l, rebuildList)
3013 	{
3014 		relation = (Relation) lfirst(l);
3015 		RelationClearRelation(relation, true);
3016 	}
3017 	list_free(rebuildList);
3018 
3019 	if (!debug_discard)
3020 		/* Any RelationBuildDesc() on the stack must start over. */
3021 		for (i = 0; i < in_progress_list_len; i++)
3022 			in_progress_list[i].invalidated = true;
3023 }
3024 
3025 /*
3026  * RelationCloseSmgrByOid - close a relcache entry's smgr link
3027  *
3028  * Needed in some cases where we are changing a relation's physical mapping.
3029  * The link will be automatically reopened on next use.
3030  */
3031 void
RelationCloseSmgrByOid(Oid relationId)3032 RelationCloseSmgrByOid(Oid relationId)
3033 {
3034 	Relation	relation;
3035 
3036 	RelationIdCacheLookup(relationId, relation);
3037 
3038 	if (!PointerIsValid(relation))
3039 		return;					/* not in cache, nothing to do */
3040 
3041 	RelationCloseSmgr(relation);
3042 }
3043 
3044 static void
RememberToFreeTupleDescAtEOX(TupleDesc td)3045 RememberToFreeTupleDescAtEOX(TupleDesc td)
3046 {
3047 	if (EOXactTupleDescArray == NULL)
3048 	{
3049 		MemoryContext oldcxt;
3050 
3051 		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3052 
3053 		EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
3054 		EOXactTupleDescArrayLen = 16;
3055 		NextEOXactTupleDescNum = 0;
3056 		MemoryContextSwitchTo(oldcxt);
3057 	}
3058 	else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
3059 	{
3060 		int32		newlen = EOXactTupleDescArrayLen * 2;
3061 
3062 		Assert(EOXactTupleDescArrayLen > 0);
3063 
3064 		EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
3065 													  newlen * sizeof(TupleDesc));
3066 		EOXactTupleDescArrayLen = newlen;
3067 	}
3068 
3069 	EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
3070 }
3071 
3072 #ifdef USE_ASSERT_CHECKING
3073 static void
AssertPendingSyncConsistency(Relation relation)3074 AssertPendingSyncConsistency(Relation relation)
3075 {
3076 	bool		relcache_verdict =
3077 	relation->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT &&
3078 	((relation->rd_createSubid != InvalidSubTransactionId &&
3079 	  RELKIND_HAS_STORAGE(relation->rd_rel->relkind)) ||
3080 	 relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId);
3081 
3082 	Assert(relcache_verdict == RelFileNodeSkippingWAL(relation->rd_node));
3083 
3084 	if (relation->rd_droppedSubid != InvalidSubTransactionId)
3085 		Assert(!relation->rd_isvalid &&
3086 			   (relation->rd_createSubid != InvalidSubTransactionId ||
3087 				relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId));
3088 }
3089 
3090 /*
3091  * AssertPendingSyncs_RelationCache
3092  *
3093  *	Assert that relcache.c and storage.c agree on whether to skip WAL.
3094  */
3095 void
AssertPendingSyncs_RelationCache(void)3096 AssertPendingSyncs_RelationCache(void)
3097 {
3098 	HASH_SEQ_STATUS status;
3099 	LOCALLOCK  *locallock;
3100 	Relation   *rels;
3101 	int			maxrels;
3102 	int			nrels;
3103 	RelIdCacheEnt *idhentry;
3104 	int			i;
3105 
3106 	/*
3107 	 * Open every relation that this transaction has locked.  If, for some
3108 	 * relation, storage.c is skipping WAL and relcache.c is not skipping WAL,
3109 	 * a CommandCounterIncrement() typically yields a local invalidation
3110 	 * message that destroys the relcache entry.  By recreating such entries
3111 	 * here, we detect the problem.
3112 	 */
3113 	PushActiveSnapshot(GetTransactionSnapshot());
3114 	maxrels = 1;
3115 	rels = palloc(maxrels * sizeof(*rels));
3116 	nrels = 0;
3117 	hash_seq_init(&status, GetLockMethodLocalHash());
3118 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3119 	{
3120 		Oid			relid;
3121 		Relation	r;
3122 
3123 		if (locallock->nLocks <= 0)
3124 			continue;
3125 		if ((LockTagType) locallock->tag.lock.locktag_type !=
3126 			LOCKTAG_RELATION)
3127 			continue;
3128 		relid = ObjectIdGetDatum(locallock->tag.lock.locktag_field2);
3129 		r = RelationIdGetRelation(relid);
3130 		if (!RelationIsValid(r))
3131 			continue;
3132 		if (nrels >= maxrels)
3133 		{
3134 			maxrels *= 2;
3135 			rels = repalloc(rels, maxrels * sizeof(*rels));
3136 		}
3137 		rels[nrels++] = r;
3138 	}
3139 
3140 	hash_seq_init(&status, RelationIdCache);
3141 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3142 		AssertPendingSyncConsistency(idhentry->reldesc);
3143 
3144 	for (i = 0; i < nrels; i++)
3145 		RelationClose(rels[i]);
3146 	PopActiveSnapshot();
3147 }
3148 #endif
3149 
3150 /*
3151  * AtEOXact_RelationCache
3152  *
3153  *	Clean up the relcache at main-transaction commit or abort.
3154  *
3155  * Note: this must be called *before* processing invalidation messages.
3156  * In the case of abort, we don't want to try to rebuild any invalidated
3157  * cache entries (since we can't safely do database accesses).  Therefore
3158  * we must reset refcnts before handling pending invalidations.
3159  *
3160  * As of PostgreSQL 8.1, relcache refcnts should get released by the
3161  * ResourceOwner mechanism.  This routine just does a debugging
3162  * cross-check that no pins remain.  However, we also need to do special
3163  * cleanup when the current transaction created any relations or made use
3164  * of forced index lists.
3165  */
3166 void
AtEOXact_RelationCache(bool isCommit)3167 AtEOXact_RelationCache(bool isCommit)
3168 {
3169 	HASH_SEQ_STATUS status;
3170 	RelIdCacheEnt *idhentry;
3171 	int			i;
3172 
3173 	/*
3174 	 * Forget in_progress_list.  This is relevant when we're aborting due to
3175 	 * an error during RelationBuildDesc().
3176 	 */
3177 	Assert(in_progress_list_len == 0 || !isCommit);
3178 	in_progress_list_len = 0;
3179 
3180 	/*
3181 	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
3182 	 * listed in it.  Otherwise fall back on a hash_seq_search scan.
3183 	 *
3184 	 * For simplicity, eoxact_list[] entries are not deleted till end of
3185 	 * top-level transaction, even though we could remove them at
3186 	 * subtransaction end in some cases, or remove relations from the list if
3187 	 * they are cleared for other reasons.  Therefore we should expect the
3188 	 * case that list entries are not found in the hashtable; if not, there's
3189 	 * nothing to do for them.
3190 	 */
3191 	if (eoxact_list_overflowed)
3192 	{
3193 		hash_seq_init(&status, RelationIdCache);
3194 		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3195 		{
3196 			AtEOXact_cleanup(idhentry->reldesc, isCommit);
3197 		}
3198 	}
3199 	else
3200 	{
3201 		for (i = 0; i < eoxact_list_len; i++)
3202 		{
3203 			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3204 													 (void *) &eoxact_list[i],
3205 													 HASH_FIND,
3206 													 NULL);
3207 			if (idhentry != NULL)
3208 				AtEOXact_cleanup(idhentry->reldesc, isCommit);
3209 		}
3210 	}
3211 
3212 	if (EOXactTupleDescArrayLen > 0)
3213 	{
3214 		Assert(EOXactTupleDescArray != NULL);
3215 		for (i = 0; i < NextEOXactTupleDescNum; i++)
3216 			FreeTupleDesc(EOXactTupleDescArray[i]);
3217 		pfree(EOXactTupleDescArray);
3218 		EOXactTupleDescArray = NULL;
3219 	}
3220 
3221 	/* Now we're out of the transaction and can clear the lists */
3222 	eoxact_list_len = 0;
3223 	eoxact_list_overflowed = false;
3224 	NextEOXactTupleDescNum = 0;
3225 	EOXactTupleDescArrayLen = 0;
3226 }
3227 
3228 /*
3229  * AtEOXact_cleanup
3230  *
3231  *	Clean up a single rel at main-transaction commit or abort
3232  *
3233  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3234  * bother to prevent duplicate entries in eoxact_list[].
3235  */
3236 static void
AtEOXact_cleanup(Relation relation,bool isCommit)3237 AtEOXact_cleanup(Relation relation, bool isCommit)
3238 {
3239 	bool		clear_relcache = false;
3240 
3241 	/*
3242 	 * The relcache entry's ref count should be back to its normal
3243 	 * not-in-a-transaction state: 0 unless it's nailed in cache.
3244 	 *
3245 	 * In bootstrap mode, this is NOT true, so don't check it --- the
3246 	 * bootstrap code expects relations to stay open across start/commit
3247 	 * transaction calls.  (That seems bogus, but it's not worth fixing.)
3248 	 *
3249 	 * Note: ideally this check would be applied to every relcache entry, not
3250 	 * just those that have eoxact work to do.  But it's not worth forcing a
3251 	 * scan of the whole relcache just for this.  (Moreover, doing so would
3252 	 * mean that assert-enabled testing never tests the hash_search code path
3253 	 * above, which seems a bad idea.)
3254 	 */
3255 #ifdef USE_ASSERT_CHECKING
3256 	if (!IsBootstrapProcessingMode())
3257 	{
3258 		int			expected_refcnt;
3259 
3260 		expected_refcnt = relation->rd_isnailed ? 1 : 0;
3261 		Assert(relation->rd_refcnt == expected_refcnt);
3262 	}
3263 #endif
3264 
3265 	/*
3266 	 * Is the relation live after this transaction ends?
3267 	 *
3268 	 * During commit, clear the relcache entry if it is preserved after
3269 	 * relation drop, in order not to orphan the entry.  During rollback,
3270 	 * clear the relcache entry if the relation is created in the current
3271 	 * transaction since it isn't interesting any longer once we are out of
3272 	 * the transaction.
3273 	 */
3274 	clear_relcache =
3275 		(isCommit ?
3276 		 relation->rd_droppedSubid != InvalidSubTransactionId :
3277 		 relation->rd_createSubid != InvalidSubTransactionId);
3278 
3279 	/*
3280 	 * Since we are now out of the transaction, reset the subids to zero. That
3281 	 * also lets RelationClearRelation() drop the relcache entry.
3282 	 */
3283 	relation->rd_createSubid = InvalidSubTransactionId;
3284 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3285 	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3286 	relation->rd_droppedSubid = InvalidSubTransactionId;
3287 
3288 	if (clear_relcache)
3289 	{
3290 		if (RelationHasReferenceCountZero(relation))
3291 		{
3292 			RelationClearRelation(relation, false);
3293 			return;
3294 		}
3295 		else
3296 		{
3297 			/*
3298 			 * Hmm, somewhere there's a (leaked?) reference to the relation.
3299 			 * We daren't remove the entry for fear of dereferencing a
3300 			 * dangling pointer later.  Bleat, and mark it as not belonging to
3301 			 * the current transaction.  Hopefully it'll get cleaned up
3302 			 * eventually.  This must be just a WARNING to avoid
3303 			 * error-during-error-recovery loops.
3304 			 */
3305 			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3306 				 RelationGetRelationName(relation));
3307 		}
3308 	}
3309 }
3310 
3311 /*
3312  * AtEOSubXact_RelationCache
3313  *
3314  *	Clean up the relcache at sub-transaction commit or abort.
3315  *
3316  * Note: this must be called *before* processing invalidation messages.
3317  */
3318 void
AtEOSubXact_RelationCache(bool isCommit,SubTransactionId mySubid,SubTransactionId parentSubid)3319 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
3320 						  SubTransactionId parentSubid)
3321 {
3322 	HASH_SEQ_STATUS status;
3323 	RelIdCacheEnt *idhentry;
3324 	int			i;
3325 
3326 	/*
3327 	 * Forget in_progress_list.  This is relevant when we're aborting due to
3328 	 * an error during RelationBuildDesc().  We don't commit subtransactions
3329 	 * during RelationBuildDesc().
3330 	 */
3331 	Assert(in_progress_list_len == 0 || !isCommit);
3332 	in_progress_list_len = 0;
3333 
3334 	/*
3335 	 * Unless the eoxact_list[] overflowed, we only need to examine the rels
3336 	 * listed in it.  Otherwise fall back on a hash_seq_search scan.  Same
3337 	 * logic as in AtEOXact_RelationCache.
3338 	 */
3339 	if (eoxact_list_overflowed)
3340 	{
3341 		hash_seq_init(&status, RelationIdCache);
3342 		while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3343 		{
3344 			AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3345 								mySubid, parentSubid);
3346 		}
3347 	}
3348 	else
3349 	{
3350 		for (i = 0; i < eoxact_list_len; i++)
3351 		{
3352 			idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
3353 													 (void *) &eoxact_list[i],
3354 													 HASH_FIND,
3355 													 NULL);
3356 			if (idhentry != NULL)
3357 				AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
3358 									mySubid, parentSubid);
3359 		}
3360 	}
3361 
3362 	/* Don't reset the list; we still need more cleanup later */
3363 }
3364 
3365 /*
3366  * AtEOSubXact_cleanup
3367  *
3368  *	Clean up a single rel at subtransaction commit or abort
3369  *
3370  * NB: this processing must be idempotent, because EOXactListAdd() doesn't
3371  * bother to prevent duplicate entries in eoxact_list[].
3372  */
3373 static void
AtEOSubXact_cleanup(Relation relation,bool isCommit,SubTransactionId mySubid,SubTransactionId parentSubid)3374 AtEOSubXact_cleanup(Relation relation, bool isCommit,
3375 					SubTransactionId mySubid, SubTransactionId parentSubid)
3376 {
3377 	/*
3378 	 * Is it a relation created in the current subtransaction?
3379 	 *
3380 	 * During subcommit, mark it as belonging to the parent, instead, as long
3381 	 * as it has not been dropped. Otherwise simply delete the relcache entry.
3382 	 * --- it isn't interesting any longer.
3383 	 */
3384 	if (relation->rd_createSubid == mySubid)
3385 	{
3386 		/*
3387 		 * Valid rd_droppedSubid means the corresponding relation is dropped
3388 		 * but the relcache entry is preserved for at-commit pending sync. We
3389 		 * need to drop it explicitly here not to make the entry orphan.
3390 		 */
3391 		Assert(relation->rd_droppedSubid == mySubid ||
3392 			   relation->rd_droppedSubid == InvalidSubTransactionId);
3393 		if (isCommit && relation->rd_droppedSubid == InvalidSubTransactionId)
3394 			relation->rd_createSubid = parentSubid;
3395 		else if (RelationHasReferenceCountZero(relation))
3396 		{
3397 			/* allow the entry to be removed */
3398 			relation->rd_createSubid = InvalidSubTransactionId;
3399 			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3400 			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3401 			relation->rd_droppedSubid = InvalidSubTransactionId;
3402 			RelationClearRelation(relation, false);
3403 			return;
3404 		}
3405 		else
3406 		{
3407 			/*
3408 			 * Hmm, somewhere there's a (leaked?) reference to the relation.
3409 			 * We daren't remove the entry for fear of dereferencing a
3410 			 * dangling pointer later.  Bleat, and transfer it to the parent
3411 			 * subtransaction so we can try again later.  This must be just a
3412 			 * WARNING to avoid error-during-error-recovery loops.
3413 			 */
3414 			relation->rd_createSubid = parentSubid;
3415 			elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
3416 				 RelationGetRelationName(relation));
3417 		}
3418 	}
3419 
3420 	/*
3421 	 * Likewise, update or drop any new-relfilenode-in-subtransaction record
3422 	 * or drop record.
3423 	 */
3424 	if (relation->rd_newRelfilenodeSubid == mySubid)
3425 	{
3426 		if (isCommit)
3427 			relation->rd_newRelfilenodeSubid = parentSubid;
3428 		else
3429 			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3430 	}
3431 
3432 	if (relation->rd_firstRelfilenodeSubid == mySubid)
3433 	{
3434 		if (isCommit)
3435 			relation->rd_firstRelfilenodeSubid = parentSubid;
3436 		else
3437 			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3438 	}
3439 
3440 	if (relation->rd_droppedSubid == mySubid)
3441 	{
3442 		if (isCommit)
3443 			relation->rd_droppedSubid = parentSubid;
3444 		else
3445 			relation->rd_droppedSubid = InvalidSubTransactionId;
3446 	}
3447 }
3448 
3449 
3450 /*
3451  *		RelationBuildLocalRelation
3452  *			Build a relcache entry for an about-to-be-created relation,
3453  *			and enter it into the relcache.
3454  */
3455 Relation
RelationBuildLocalRelation(const char * relname,Oid relnamespace,TupleDesc tupDesc,Oid relid,Oid accessmtd,Oid relfilenode,Oid reltablespace,bool shared_relation,bool mapped_relation,char relpersistence,char relkind)3456 RelationBuildLocalRelation(const char *relname,
3457 						   Oid relnamespace,
3458 						   TupleDesc tupDesc,
3459 						   Oid relid,
3460 						   Oid accessmtd,
3461 						   Oid relfilenode,
3462 						   Oid reltablespace,
3463 						   bool shared_relation,
3464 						   bool mapped_relation,
3465 						   char relpersistence,
3466 						   char relkind)
3467 {
3468 	Relation	rel;
3469 	MemoryContext oldcxt;
3470 	int			natts = tupDesc->natts;
3471 	int			i;
3472 	bool		has_not_null;
3473 	bool		nailit;
3474 
3475 	AssertArg(natts >= 0);
3476 
3477 	/*
3478 	 * check for creation of a rel that must be nailed in cache.
3479 	 *
3480 	 * XXX this list had better match the relations specially handled in
3481 	 * RelationCacheInitializePhase2/3.
3482 	 */
3483 	switch (relid)
3484 	{
3485 		case DatabaseRelationId:
3486 		case AuthIdRelationId:
3487 		case AuthMemRelationId:
3488 		case RelationRelationId:
3489 		case AttributeRelationId:
3490 		case ProcedureRelationId:
3491 		case TypeRelationId:
3492 			nailit = true;
3493 			break;
3494 		default:
3495 			nailit = false;
3496 			break;
3497 	}
3498 
3499 	/*
3500 	 * check that hardwired list of shared rels matches what's in the
3501 	 * bootstrap .bki file.  If you get a failure here during initdb, you
3502 	 * probably need to fix IsSharedRelation() to match whatever you've done
3503 	 * to the set of shared relations.
3504 	 */
3505 	if (shared_relation != IsSharedRelation(relid))
3506 		elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
3507 			 relname, relid);
3508 
3509 	/* Shared relations had better be mapped, too */
3510 	Assert(mapped_relation || !shared_relation);
3511 
3512 	/*
3513 	 * switch to the cache context to create the relcache entry.
3514 	 */
3515 	if (!CacheMemoryContext)
3516 		CreateCacheMemoryContext();
3517 
3518 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3519 
3520 	/*
3521 	 * allocate a new relation descriptor and fill in basic state fields.
3522 	 */
3523 	rel = (Relation) palloc0(sizeof(RelationData));
3524 
3525 	/* make sure relation is marked as having no open file yet */
3526 	rel->rd_smgr = NULL;
3527 
3528 	/* mark it nailed if appropriate */
3529 	rel->rd_isnailed = nailit;
3530 
3531 	rel->rd_refcnt = nailit ? 1 : 0;
3532 
3533 	/* it's being created in this transaction */
3534 	rel->rd_createSubid = GetCurrentSubTransactionId();
3535 	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3536 	rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
3537 	rel->rd_droppedSubid = InvalidSubTransactionId;
3538 
3539 	/*
3540 	 * create a new tuple descriptor from the one passed in.  We do this
3541 	 * partly to copy it into the cache context, and partly because the new
3542 	 * relation can't have any defaults or constraints yet; they have to be
3543 	 * added in later steps, because they require additions to multiple system
3544 	 * catalogs.  We can copy attnotnull constraints here, however.
3545 	 */
3546 	rel->rd_att = CreateTupleDescCopy(tupDesc);
3547 	rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
3548 	has_not_null = false;
3549 	for (i = 0; i < natts; i++)
3550 	{
3551 		Form_pg_attribute satt = TupleDescAttr(tupDesc, i);
3552 		Form_pg_attribute datt = TupleDescAttr(rel->rd_att, i);
3553 
3554 		datt->attidentity = satt->attidentity;
3555 		datt->attgenerated = satt->attgenerated;
3556 		datt->attnotnull = satt->attnotnull;
3557 		has_not_null |= satt->attnotnull;
3558 	}
3559 
3560 	if (has_not_null)
3561 	{
3562 		TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3563 
3564 		constr->has_not_null = true;
3565 		rel->rd_att->constr = constr;
3566 	}
3567 
3568 	/*
3569 	 * initialize relation tuple form (caller may add/override data later)
3570 	 */
3571 	rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
3572 
3573 	namestrcpy(&rel->rd_rel->relname, relname);
3574 	rel->rd_rel->relnamespace = relnamespace;
3575 
3576 	rel->rd_rel->relkind = relkind;
3577 	rel->rd_rel->relnatts = natts;
3578 	rel->rd_rel->reltype = InvalidOid;
3579 	/* needed when bootstrapping: */
3580 	rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
3581 
3582 	/* set up persistence and relcache fields dependent on it */
3583 	rel->rd_rel->relpersistence = relpersistence;
3584 	switch (relpersistence)
3585 	{
3586 		case RELPERSISTENCE_UNLOGGED:
3587 		case RELPERSISTENCE_PERMANENT:
3588 			rel->rd_backend = InvalidBackendId;
3589 			rel->rd_islocaltemp = false;
3590 			break;
3591 		case RELPERSISTENCE_TEMP:
3592 			Assert(isTempOrTempToastNamespace(relnamespace));
3593 			rel->rd_backend = BackendIdForTempRelations();
3594 			rel->rd_islocaltemp = true;
3595 			break;
3596 		default:
3597 			elog(ERROR, "invalid relpersistence: %c", relpersistence);
3598 			break;
3599 	}
3600 
3601 	/* if it's a materialized view, it's not populated initially */
3602 	if (relkind == RELKIND_MATVIEW)
3603 		rel->rd_rel->relispopulated = false;
3604 	else
3605 		rel->rd_rel->relispopulated = true;
3606 
3607 	/* set replica identity -- system catalogs and non-tables don't have one */
3608 	if (!IsCatalogNamespace(relnamespace) &&
3609 		(relkind == RELKIND_RELATION ||
3610 		 relkind == RELKIND_MATVIEW ||
3611 		 relkind == RELKIND_PARTITIONED_TABLE))
3612 		rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
3613 	else
3614 		rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
3615 
3616 	/*
3617 	 * Insert relation physical and logical identifiers (OIDs) into the right
3618 	 * places.  For a mapped relation, we set relfilenode to zero and rely on
3619 	 * RelationInitPhysicalAddr to consult the map.
3620 	 */
3621 	rel->rd_rel->relisshared = shared_relation;
3622 
3623 	RelationGetRelid(rel) = relid;
3624 
3625 	for (i = 0; i < natts; i++)
3626 		TupleDescAttr(rel->rd_att, i)->attrelid = relid;
3627 
3628 	rel->rd_rel->reltablespace = reltablespace;
3629 
3630 	if (mapped_relation)
3631 	{
3632 		rel->rd_rel->relfilenode = InvalidOid;
3633 		/* Add it to the active mapping information */
3634 		RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
3635 	}
3636 	else
3637 		rel->rd_rel->relfilenode = relfilenode;
3638 
3639 	RelationInitLockInfo(rel);	/* see lmgr.c */
3640 
3641 	RelationInitPhysicalAddr(rel);
3642 
3643 	rel->rd_rel->relam = accessmtd;
3644 
3645 	/*
3646 	 * RelationInitTableAccessMethod will do syscache lookups, so we mustn't
3647 	 * run it in CacheMemoryContext.  Fortunately, the remaining steps don't
3648 	 * require a long-lived current context.
3649 	 */
3650 	MemoryContextSwitchTo(oldcxt);
3651 
3652 	if (relkind == RELKIND_RELATION ||
3653 		relkind == RELKIND_SEQUENCE ||
3654 		relkind == RELKIND_TOASTVALUE ||
3655 		relkind == RELKIND_MATVIEW)
3656 		RelationInitTableAccessMethod(rel);
3657 
3658 	/*
3659 	 * Okay to insert into the relcache hash table.
3660 	 *
3661 	 * Ordinarily, there should certainly not be an existing hash entry for
3662 	 * the same OID; but during bootstrap, when we create a "real" relcache
3663 	 * entry for one of the bootstrap relations, we'll be overwriting the
3664 	 * phony one created with formrdesc.  So allow that to happen for nailed
3665 	 * rels.
3666 	 */
3667 	RelationCacheInsert(rel, nailit);
3668 
3669 	/*
3670 	 * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
3671 	 * can't do this before storing relid in it.
3672 	 */
3673 	EOXactListAdd(rel);
3674 
3675 	/* It's fully valid */
3676 	rel->rd_isvalid = true;
3677 
3678 	/*
3679 	 * Caller expects us to pin the returned entry.
3680 	 */
3681 	RelationIncrementReferenceCount(rel);
3682 
3683 	return rel;
3684 }
3685 
3686 
3687 /*
3688  * RelationSetNewRelfilenode
3689  *
3690  * Assign a new relfilenode (physical file name), and possibly a new
3691  * persistence setting, to the relation.
3692  *
3693  * This allows a full rewrite of the relation to be done with transactional
3694  * safety (since the filenode assignment can be rolled back).  Note however
3695  * that there is no simple way to access the relation's old data for the
3696  * remainder of the current transaction.  This limits the usefulness to cases
3697  * such as TRUNCATE or rebuilding an index from scratch.
3698  *
3699  * Caller must already hold exclusive lock on the relation.
3700  */
3701 void
RelationSetNewRelfilenode(Relation relation,char persistence)3702 RelationSetNewRelfilenode(Relation relation, char persistence)
3703 {
3704 	Oid			newrelfilenode;
3705 	Relation	pg_class;
3706 	HeapTuple	tuple;
3707 	Form_pg_class classform;
3708 	MultiXactId minmulti = InvalidMultiXactId;
3709 	TransactionId freezeXid = InvalidTransactionId;
3710 	RelFileNode newrnode;
3711 
3712 	/* Allocate a new relfilenode */
3713 	newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
3714 									   persistence);
3715 
3716 	/*
3717 	 * Get a writable copy of the pg_class tuple for the given relation.
3718 	 */
3719 	pg_class = table_open(RelationRelationId, RowExclusiveLock);
3720 
3721 	tuple = SearchSysCacheCopy1(RELOID,
3722 								ObjectIdGetDatum(RelationGetRelid(relation)));
3723 	if (!HeapTupleIsValid(tuple))
3724 		elog(ERROR, "could not find tuple for relation %u",
3725 			 RelationGetRelid(relation));
3726 	classform = (Form_pg_class) GETSTRUCT(tuple);
3727 
3728 	/*
3729 	 * Schedule unlinking of the old storage at transaction commit.
3730 	 */
3731 	RelationDropStorage(relation);
3732 
3733 	/*
3734 	 * Create storage for the main fork of the new relfilenode.  If it's a
3735 	 * table-like object, call into the table AM to do so, which'll also
3736 	 * create the table's init fork if needed.
3737 	 *
3738 	 * NOTE: If relevant for the AM, any conflict in relfilenode value will be
3739 	 * caught here, if GetNewRelFileNode messes up for any reason.
3740 	 */
3741 	newrnode = relation->rd_node;
3742 	newrnode.relNode = newrelfilenode;
3743 
3744 	switch (relation->rd_rel->relkind)
3745 	{
3746 		case RELKIND_INDEX:
3747 		case RELKIND_SEQUENCE:
3748 			{
3749 				/* handle these directly, at least for now */
3750 				SMgrRelation srel;
3751 
3752 				srel = RelationCreateStorage(newrnode, persistence);
3753 				smgrclose(srel);
3754 			}
3755 			break;
3756 
3757 		case RELKIND_RELATION:
3758 		case RELKIND_TOASTVALUE:
3759 		case RELKIND_MATVIEW:
3760 			table_relation_set_new_filenode(relation, &newrnode,
3761 											persistence,
3762 											&freezeXid, &minmulti);
3763 			break;
3764 
3765 		default:
3766 			/* we shouldn't be called for anything else */
3767 			elog(ERROR, "relation \"%s\" does not have storage",
3768 				 RelationGetRelationName(relation));
3769 			break;
3770 	}
3771 
3772 	/*
3773 	 * If we're dealing with a mapped index, pg_class.relfilenode doesn't
3774 	 * change; instead we have to send the update to the relation mapper.
3775 	 *
3776 	 * For mapped indexes, we don't actually change the pg_class entry at all;
3777 	 * this is essential when reindexing pg_class itself.  That leaves us with
3778 	 * possibly-inaccurate values of relpages etc, but those will be fixed up
3779 	 * later.
3780 	 */
3781 	if (RelationIsMapped(relation))
3782 	{
3783 		/* This case is only supported for indexes */
3784 		Assert(relation->rd_rel->relkind == RELKIND_INDEX);
3785 
3786 		/* Since we're not updating pg_class, these had better not change */
3787 		Assert(classform->relfrozenxid == freezeXid);
3788 		Assert(classform->relminmxid == minmulti);
3789 		Assert(classform->relpersistence == persistence);
3790 
3791 		/*
3792 		 * In some code paths it's possible that the tuple update we'd
3793 		 * otherwise do here is the only thing that would assign an XID for
3794 		 * the current transaction.  However, we must have an XID to delete
3795 		 * files, so make sure one is assigned.
3796 		 */
3797 		(void) GetCurrentTransactionId();
3798 
3799 		/* Do the deed */
3800 		RelationMapUpdateMap(RelationGetRelid(relation),
3801 							 newrelfilenode,
3802 							 relation->rd_rel->relisshared,
3803 							 false);
3804 
3805 		/* Since we're not updating pg_class, must trigger inval manually */
3806 		CacheInvalidateRelcache(relation);
3807 	}
3808 	else
3809 	{
3810 		/* Normal case, update the pg_class entry */
3811 		classform->relfilenode = newrelfilenode;
3812 
3813 		/* relpages etc. never change for sequences */
3814 		if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
3815 		{
3816 			classform->relpages = 0;	/* it's empty until further notice */
3817 			classform->reltuples = 0;
3818 			classform->relallvisible = 0;
3819 		}
3820 		classform->relfrozenxid = freezeXid;
3821 		classform->relminmxid = minmulti;
3822 		classform->relpersistence = persistence;
3823 
3824 		CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
3825 	}
3826 
3827 	heap_freetuple(tuple);
3828 
3829 	table_close(pg_class, RowExclusiveLock);
3830 
3831 	/*
3832 	 * Make the pg_class row change or relation map change visible.  This will
3833 	 * cause the relcache entry to get updated, too.
3834 	 */
3835 	CommandCounterIncrement();
3836 
3837 	RelationAssumeNewRelfilenode(relation);
3838 }
3839 
3840 /*
3841  * RelationAssumeNewRelfilenode
3842  *
3843  * Code that modifies pg_class.reltablespace or pg_class.relfilenode must call
3844  * this.  The call shall precede any code that might insert WAL records whose
3845  * replay would modify bytes in the new RelFileNode, and the call shall follow
3846  * any WAL modifying bytes in the prior RelFileNode.  See struct RelationData.
3847  * Ideally, call this as near as possible to the CommandCounterIncrement()
3848  * that makes the pg_class change visible (before it or after it); that
3849  * minimizes the chance of future development adding a forbidden WAL insertion
3850  * between RelationAssumeNewRelfilenode() and CommandCounterIncrement().
3851  */
3852 void
RelationAssumeNewRelfilenode(Relation relation)3853 RelationAssumeNewRelfilenode(Relation relation)
3854 {
3855 	relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
3856 	if (relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
3857 		relation->rd_firstRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
3858 
3859 	/* Flag relation as needing eoxact cleanup (to clear these fields) */
3860 	EOXactListAdd(relation);
3861 }
3862 
3863 
3864 /*
3865  *		RelationCacheInitialize
3866  *
3867  *		This initializes the relation descriptor cache.  At the time
3868  *		that this is invoked, we can't do database access yet (mainly
3869  *		because the transaction subsystem is not up); all we are doing
3870  *		is making an empty cache hashtable.  This must be done before
3871  *		starting the initialization transaction, because otherwise
3872  *		AtEOXact_RelationCache would crash if that transaction aborts
3873  *		before we can get the relcache set up.
3874  */
3875 
3876 #define INITRELCACHESIZE		400
3877 
3878 void
RelationCacheInitialize(void)3879 RelationCacheInitialize(void)
3880 {
3881 	HASHCTL		ctl;
3882 	int			allocsize;
3883 
3884 	/*
3885 	 * make sure cache memory context exists
3886 	 */
3887 	if (!CacheMemoryContext)
3888 		CreateCacheMemoryContext();
3889 
3890 	/*
3891 	 * create hashtable that indexes the relcache
3892 	 */
3893 	MemSet(&ctl, 0, sizeof(ctl));
3894 	ctl.keysize = sizeof(Oid);
3895 	ctl.entrysize = sizeof(RelIdCacheEnt);
3896 	RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
3897 								  &ctl, HASH_ELEM | HASH_BLOBS);
3898 
3899 	/*
3900 	 * reserve enough in_progress_list slots for many cases
3901 	 */
3902 	allocsize = 4;
3903 	in_progress_list =
3904 		MemoryContextAlloc(CacheMemoryContext,
3905 						   allocsize * sizeof(*in_progress_list));
3906 	in_progress_list_maxlen = allocsize;
3907 
3908 	/*
3909 	 * relation mapper needs to be initialized too
3910 	 */
3911 	RelationMapInitialize();
3912 }
3913 
3914 /*
3915  *		RelationCacheInitializePhase2
3916  *
3917  *		This is called to prepare for access to shared catalogs during startup.
3918  *		We must at least set up nailed reldescs for pg_database, pg_authid,
3919  *		pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
3920  *		for their indexes, too.  We attempt to load this information from the
3921  *		shared relcache init file.  If that's missing or broken, just make
3922  *		phony entries for the catalogs themselves.
3923  *		RelationCacheInitializePhase3 will clean up as needed.
3924  */
3925 void
RelationCacheInitializePhase2(void)3926 RelationCacheInitializePhase2(void)
3927 {
3928 	MemoryContext oldcxt;
3929 
3930 	/*
3931 	 * relation mapper needs initialized too
3932 	 */
3933 	RelationMapInitializePhase2();
3934 
3935 	/*
3936 	 * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
3937 	 * nothing.
3938 	 */
3939 	if (IsBootstrapProcessingMode())
3940 		return;
3941 
3942 	/*
3943 	 * switch to cache memory context
3944 	 */
3945 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3946 
3947 	/*
3948 	 * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
3949 	 * the cache with pre-made descriptors for the critical shared catalogs.
3950 	 */
3951 	if (!load_relcache_init_file(true))
3952 	{
3953 		formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
3954 				  Natts_pg_database, Desc_pg_database);
3955 		formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
3956 				  Natts_pg_authid, Desc_pg_authid);
3957 		formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
3958 				  Natts_pg_auth_members, Desc_pg_auth_members);
3959 		formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
3960 				  Natts_pg_shseclabel, Desc_pg_shseclabel);
3961 		formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
3962 				  Natts_pg_subscription, Desc_pg_subscription);
3963 
3964 #define NUM_CRITICAL_SHARED_RELS	5	/* fix if you change list above */
3965 	}
3966 
3967 	MemoryContextSwitchTo(oldcxt);
3968 }
3969 
3970 /*
3971  *		RelationCacheInitializePhase3
3972  *
3973  *		This is called as soon as the catcache and transaction system
3974  *		are functional and we have determined MyDatabaseId.  At this point
3975  *		we can actually read data from the database's system catalogs.
3976  *		We first try to read pre-computed relcache entries from the local
3977  *		relcache init file.  If that's missing or broken, make phony entries
3978  *		for the minimum set of nailed-in-cache relations.  Then (unless
3979  *		bootstrapping) make sure we have entries for the critical system
3980  *		indexes.  Once we've done all this, we have enough infrastructure to
3981  *		open any system catalog or use any catcache.  The last step is to
3982  *		rewrite the cache files if needed.
3983  */
3984 void
RelationCacheInitializePhase3(void)3985 RelationCacheInitializePhase3(void)
3986 {
3987 	HASH_SEQ_STATUS status;
3988 	RelIdCacheEnt *idhentry;
3989 	MemoryContext oldcxt;
3990 	bool		needNewCacheFile = !criticalSharedRelcachesBuilt;
3991 
3992 	/*
3993 	 * relation mapper needs initialized too
3994 	 */
3995 	RelationMapInitializePhase3();
3996 
3997 	/*
3998 	 * switch to cache memory context
3999 	 */
4000 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4001 
4002 	/*
4003 	 * Try to load the local relcache cache file.  If unsuccessful, bootstrap
4004 	 * the cache with pre-made descriptors for the critical "nailed-in" system
4005 	 * catalogs.
4006 	 */
4007 	if (IsBootstrapProcessingMode() ||
4008 		!load_relcache_init_file(false))
4009 	{
4010 		needNewCacheFile = true;
4011 
4012 		formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
4013 				  Natts_pg_class, Desc_pg_class);
4014 		formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
4015 				  Natts_pg_attribute, Desc_pg_attribute);
4016 		formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
4017 				  Natts_pg_proc, Desc_pg_proc);
4018 		formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
4019 				  Natts_pg_type, Desc_pg_type);
4020 
4021 #define NUM_CRITICAL_LOCAL_RELS 4	/* fix if you change list above */
4022 	}
4023 
4024 	MemoryContextSwitchTo(oldcxt);
4025 
4026 	/* In bootstrap mode, the faked-up formrdesc info is all we'll have */
4027 	if (IsBootstrapProcessingMode())
4028 		return;
4029 
4030 	/*
4031 	 * If we didn't get the critical system indexes loaded into relcache, do
4032 	 * so now.  These are critical because the catcache and/or opclass cache
4033 	 * depend on them for fetches done during relcache load.  Thus, we have an
4034 	 * infinite-recursion problem.  We can break the recursion by doing
4035 	 * heapscans instead of indexscans at certain key spots. To avoid hobbling
4036 	 * performance, we only want to do that until we have the critical indexes
4037 	 * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
4038 	 * decide whether to do heapscan or indexscan at the key spots, and we set
4039 	 * it true after we've loaded the critical indexes.
4040 	 *
4041 	 * The critical indexes are marked as "nailed in cache", partly to make it
4042 	 * easy for load_relcache_init_file to count them, but mainly because we
4043 	 * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
4044 	 * true.  (NOTE: perhaps it would be possible to reload them by
4045 	 * temporarily setting criticalRelcachesBuilt to false again.  For now,
4046 	 * though, we just nail 'em in.)
4047 	 *
4048 	 * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
4049 	 * in the same way as the others, because the critical catalogs don't
4050 	 * (currently) have any rules or triggers, and so these indexes can be
4051 	 * rebuilt without inducing recursion.  However they are used during
4052 	 * relcache load when a rel does have rules or triggers, so we choose to
4053 	 * nail them for performance reasons.
4054 	 */
4055 	if (!criticalRelcachesBuilt)
4056 	{
4057 		load_critical_index(ClassOidIndexId,
4058 							RelationRelationId);
4059 		load_critical_index(AttributeRelidNumIndexId,
4060 							AttributeRelationId);
4061 		load_critical_index(IndexRelidIndexId,
4062 							IndexRelationId);
4063 		load_critical_index(OpclassOidIndexId,
4064 							OperatorClassRelationId);
4065 		load_critical_index(AccessMethodProcedureIndexId,
4066 							AccessMethodProcedureRelationId);
4067 		load_critical_index(RewriteRelRulenameIndexId,
4068 							RewriteRelationId);
4069 		load_critical_index(TriggerRelidNameIndexId,
4070 							TriggerRelationId);
4071 
4072 #define NUM_CRITICAL_LOCAL_INDEXES	7	/* fix if you change list above */
4073 
4074 		criticalRelcachesBuilt = true;
4075 	}
4076 
4077 	/*
4078 	 * Process critical shared indexes too.
4079 	 *
4080 	 * DatabaseNameIndexId isn't critical for relcache loading, but rather for
4081 	 * initial lookup of MyDatabaseId, without which we'll never find any
4082 	 * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
4083 	 * database OID, so it instead depends on DatabaseOidIndexId.  We also
4084 	 * need to nail up some indexes on pg_authid and pg_auth_members for use
4085 	 * during client authentication.  SharedSecLabelObjectIndexId isn't
4086 	 * critical for the core system, but authentication hooks might be
4087 	 * interested in it.
4088 	 */
4089 	if (!criticalSharedRelcachesBuilt)
4090 	{
4091 		load_critical_index(DatabaseNameIndexId,
4092 							DatabaseRelationId);
4093 		load_critical_index(DatabaseOidIndexId,
4094 							DatabaseRelationId);
4095 		load_critical_index(AuthIdRolnameIndexId,
4096 							AuthIdRelationId);
4097 		load_critical_index(AuthIdOidIndexId,
4098 							AuthIdRelationId);
4099 		load_critical_index(AuthMemMemRoleIndexId,
4100 							AuthMemRelationId);
4101 		load_critical_index(SharedSecLabelObjectIndexId,
4102 							SharedSecLabelRelationId);
4103 
4104 #define NUM_CRITICAL_SHARED_INDEXES 6	/* fix if you change list above */
4105 
4106 		criticalSharedRelcachesBuilt = true;
4107 	}
4108 
4109 	/*
4110 	 * Now, scan all the relcache entries and update anything that might be
4111 	 * wrong in the results from formrdesc or the relcache cache file. If we
4112 	 * faked up relcache entries using formrdesc, then read the real pg_class
4113 	 * rows and replace the fake entries with them. Also, if any of the
4114 	 * relcache entries have rules, triggers, or security policies, load that
4115 	 * info the hard way since it isn't recorded in the cache file.
4116 	 *
4117 	 * Whenever we access the catalogs to read data, there is a possibility of
4118 	 * a shared-inval cache flush causing relcache entries to be removed.
4119 	 * Since hash_seq_search only guarantees to still work after the *current*
4120 	 * entry is removed, it's unsafe to continue the hashtable scan afterward.
4121 	 * We handle this by restarting the scan from scratch after each access.
4122 	 * This is theoretically O(N^2), but the number of entries that actually
4123 	 * need to be fixed is small enough that it doesn't matter.
4124 	 */
4125 	hash_seq_init(&status, RelationIdCache);
4126 
4127 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
4128 	{
4129 		Relation	relation = idhentry->reldesc;
4130 		bool		restart = false;
4131 
4132 		/*
4133 		 * Make sure *this* entry doesn't get flushed while we work with it.
4134 		 */
4135 		RelationIncrementReferenceCount(relation);
4136 
4137 		/*
4138 		 * If it's a faked-up entry, read the real pg_class tuple.
4139 		 */
4140 		if (relation->rd_rel->relowner == InvalidOid)
4141 		{
4142 			HeapTuple	htup;
4143 			Form_pg_class relp;
4144 
4145 			htup = SearchSysCache1(RELOID,
4146 								   ObjectIdGetDatum(RelationGetRelid(relation)));
4147 			if (!HeapTupleIsValid(htup))
4148 				elog(FATAL, "cache lookup failed for relation %u",
4149 					 RelationGetRelid(relation));
4150 			relp = (Form_pg_class) GETSTRUCT(htup);
4151 
4152 			/*
4153 			 * Copy tuple to relation->rd_rel. (See notes in
4154 			 * AllocateRelationDesc())
4155 			 */
4156 			memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
4157 
4158 			/* Update rd_options while we have the tuple */
4159 			if (relation->rd_options)
4160 				pfree(relation->rd_options);
4161 			RelationParseRelOptions(relation, htup);
4162 
4163 			/*
4164 			 * Check the values in rd_att were set up correctly.  (We cannot
4165 			 * just copy them over now: formrdesc must have set up the rd_att
4166 			 * data correctly to start with, because it may already have been
4167 			 * copied into one or more catcache entries.)
4168 			 */
4169 			Assert(relation->rd_att->tdtypeid == relp->reltype);
4170 			Assert(relation->rd_att->tdtypmod == -1);
4171 
4172 			ReleaseSysCache(htup);
4173 
4174 			/* relowner had better be OK now, else we'll loop forever */
4175 			if (relation->rd_rel->relowner == InvalidOid)
4176 				elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
4177 					 RelationGetRelationName(relation));
4178 
4179 			restart = true;
4180 		}
4181 
4182 		/*
4183 		 * Fix data that isn't saved in relcache cache file.
4184 		 *
4185 		 * relhasrules or relhastriggers could possibly be wrong or out of
4186 		 * date.  If we don't actually find any rules or triggers, clear the
4187 		 * local copy of the flag so that we don't get into an infinite loop
4188 		 * here.  We don't make any attempt to fix the pg_class entry, though.
4189 		 */
4190 		if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
4191 		{
4192 			RelationBuildRuleLock(relation);
4193 			if (relation->rd_rules == NULL)
4194 				relation->rd_rel->relhasrules = false;
4195 			restart = true;
4196 		}
4197 		if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
4198 		{
4199 			RelationBuildTriggers(relation);
4200 			if (relation->trigdesc == NULL)
4201 				relation->rd_rel->relhastriggers = false;
4202 			restart = true;
4203 		}
4204 
4205 		/*
4206 		 * Re-load the row security policies if the relation has them, since
4207 		 * they are not preserved in the cache.  Note that we can never NOT
4208 		 * have a policy while relrowsecurity is true,
4209 		 * RelationBuildRowSecurity will create a single default-deny policy
4210 		 * if there is no policy defined in pg_policy.
4211 		 */
4212 		if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
4213 		{
4214 			RelationBuildRowSecurity(relation);
4215 
4216 			Assert(relation->rd_rsdesc != NULL);
4217 			restart = true;
4218 		}
4219 
4220 		/* Reload tableam data if needed */
4221 		if (relation->rd_tableam == NULL &&
4222 			(relation->rd_rel->relkind == RELKIND_RELATION ||
4223 			 relation->rd_rel->relkind == RELKIND_SEQUENCE ||
4224 			 relation->rd_rel->relkind == RELKIND_TOASTVALUE ||
4225 			 relation->rd_rel->relkind == RELKIND_MATVIEW))
4226 		{
4227 			RelationInitTableAccessMethod(relation);
4228 			Assert(relation->rd_tableam != NULL);
4229 
4230 			restart = true;
4231 		}
4232 
4233 		/* Release hold on the relation */
4234 		RelationDecrementReferenceCount(relation);
4235 
4236 		/* Now, restart the hashtable scan if needed */
4237 		if (restart)
4238 		{
4239 			hash_seq_term(&status);
4240 			hash_seq_init(&status, RelationIdCache);
4241 		}
4242 	}
4243 
4244 	/*
4245 	 * Lastly, write out new relcache cache files if needed.  We don't bother
4246 	 * to distinguish cases where only one of the two needs an update.
4247 	 */
4248 	if (needNewCacheFile)
4249 	{
4250 		/*
4251 		 * Force all the catcaches to finish initializing and thereby open the
4252 		 * catalogs and indexes they use.  This will preload the relcache with
4253 		 * entries for all the most important system catalogs and indexes, so
4254 		 * that the init files will be most useful for future backends.
4255 		 */
4256 		InitCatalogCachePhase2();
4257 
4258 		/* now write the files */
4259 		write_relcache_init_file(true);
4260 		write_relcache_init_file(false);
4261 	}
4262 }
4263 
4264 /*
4265  * Load one critical system index into the relcache
4266  *
4267  * indexoid is the OID of the target index, heapoid is the OID of the catalog
4268  * it belongs to.
4269  */
4270 static void
load_critical_index(Oid indexoid,Oid heapoid)4271 load_critical_index(Oid indexoid, Oid heapoid)
4272 {
4273 	Relation	ird;
4274 
4275 	/*
4276 	 * We must lock the underlying catalog before locking the index to avoid
4277 	 * deadlock, since RelationBuildDesc might well need to read the catalog,
4278 	 * and if anyone else is exclusive-locking this catalog and index they'll
4279 	 * be doing it in that order.
4280 	 */
4281 	LockRelationOid(heapoid, AccessShareLock);
4282 	LockRelationOid(indexoid, AccessShareLock);
4283 	ird = RelationBuildDesc(indexoid, true);
4284 	if (ird == NULL)
4285 		elog(PANIC, "could not open critical system index %u", indexoid);
4286 	ird->rd_isnailed = true;
4287 	ird->rd_refcnt = 1;
4288 	UnlockRelationOid(indexoid, AccessShareLock);
4289 	UnlockRelationOid(heapoid, AccessShareLock);
4290 
4291 	(void) RelationGetIndexAttOptions(ird, false);
4292 }
4293 
4294 /*
4295  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
4296  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
4297  *
4298  * We need this kluge because we have to be able to access non-fixed-width
4299  * fields of pg_class and pg_index before we have the standard catalog caches
4300  * available.  We use predefined data that's set up in just the same way as
4301  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
4302  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
4303  * does it have a TupleConstr field.  But it's good enough for the purpose of
4304  * extracting fields.
4305  */
4306 static TupleDesc
BuildHardcodedDescriptor(int natts,const FormData_pg_attribute * attrs)4307 BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs)
4308 {
4309 	TupleDesc	result;
4310 	MemoryContext oldcxt;
4311 	int			i;
4312 
4313 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4314 
4315 	result = CreateTemplateTupleDesc(natts);
4316 	result->tdtypeid = RECORDOID;	/* not right, but we don't care */
4317 	result->tdtypmod = -1;
4318 
4319 	for (i = 0; i < natts; i++)
4320 	{
4321 		memcpy(TupleDescAttr(result, i), &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
4322 		/* make sure attcacheoff is valid */
4323 		TupleDescAttr(result, i)->attcacheoff = -1;
4324 	}
4325 
4326 	/* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
4327 	TupleDescAttr(result, 0)->attcacheoff = 0;
4328 
4329 	/* Note: we don't bother to set up a TupleConstr entry */
4330 
4331 	MemoryContextSwitchTo(oldcxt);
4332 
4333 	return result;
4334 }
4335 
4336 static TupleDesc
GetPgClassDescriptor(void)4337 GetPgClassDescriptor(void)
4338 {
4339 	static TupleDesc pgclassdesc = NULL;
4340 
4341 	/* Already done? */
4342 	if (pgclassdesc == NULL)
4343 		pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
4344 											   Desc_pg_class);
4345 
4346 	return pgclassdesc;
4347 }
4348 
4349 static TupleDesc
GetPgIndexDescriptor(void)4350 GetPgIndexDescriptor(void)
4351 {
4352 	static TupleDesc pgindexdesc = NULL;
4353 
4354 	/* Already done? */
4355 	if (pgindexdesc == NULL)
4356 		pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
4357 											   Desc_pg_index);
4358 
4359 	return pgindexdesc;
4360 }
4361 
4362 /*
4363  * Load any default attribute value definitions for the relation.
4364  */
4365 static void
AttrDefaultFetch(Relation relation)4366 AttrDefaultFetch(Relation relation)
4367 {
4368 	AttrDefault *attrdef = relation->rd_att->constr->defval;
4369 	int			ndef = relation->rd_att->constr->num_defval;
4370 	Relation	adrel;
4371 	SysScanDesc adscan;
4372 	ScanKeyData skey;
4373 	HeapTuple	htup;
4374 	Datum		val;
4375 	bool		isnull;
4376 	int			found;
4377 	int			i;
4378 
4379 	ScanKeyInit(&skey,
4380 				Anum_pg_attrdef_adrelid,
4381 				BTEqualStrategyNumber, F_OIDEQ,
4382 				ObjectIdGetDatum(RelationGetRelid(relation)));
4383 
4384 	adrel = table_open(AttrDefaultRelationId, AccessShareLock);
4385 	adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
4386 								NULL, 1, &skey);
4387 	found = 0;
4388 
4389 	while (HeapTupleIsValid(htup = systable_getnext(adscan)))
4390 	{
4391 		Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
4392 		Form_pg_attribute attr = TupleDescAttr(relation->rd_att, adform->adnum - 1);
4393 
4394 		for (i = 0; i < ndef; i++)
4395 		{
4396 			if (adform->adnum != attrdef[i].adnum)
4397 				continue;
4398 			if (attrdef[i].adbin != NULL)
4399 				elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
4400 					 NameStr(attr->attname),
4401 					 RelationGetRelationName(relation));
4402 			else
4403 				found++;
4404 
4405 			val = fastgetattr(htup,
4406 							  Anum_pg_attrdef_adbin,
4407 							  adrel->rd_att, &isnull);
4408 			if (isnull)
4409 				elog(WARNING, "null adbin for attr %s of rel %s",
4410 					 NameStr(attr->attname),
4411 					 RelationGetRelationName(relation));
4412 			else
4413 			{
4414 				/* detoast and convert to cstring in caller's context */
4415 				char	   *s = TextDatumGetCString(val);
4416 
4417 				attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s);
4418 				pfree(s);
4419 			}
4420 			break;
4421 		}
4422 
4423 		if (i >= ndef)
4424 			elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
4425 				 adform->adnum, RelationGetRelationName(relation));
4426 	}
4427 
4428 	systable_endscan(adscan);
4429 	table_close(adrel, AccessShareLock);
4430 }
4431 
4432 /*
4433  * Load any check constraints for the relation.
4434  */
4435 static void
CheckConstraintFetch(Relation relation)4436 CheckConstraintFetch(Relation relation)
4437 {
4438 	ConstrCheck *check = relation->rd_att->constr->check;
4439 	int			ncheck = relation->rd_att->constr->num_check;
4440 	Relation	conrel;
4441 	SysScanDesc conscan;
4442 	ScanKeyData skey[1];
4443 	HeapTuple	htup;
4444 	int			found = 0;
4445 
4446 	ScanKeyInit(&skey[0],
4447 				Anum_pg_constraint_conrelid,
4448 				BTEqualStrategyNumber, F_OIDEQ,
4449 				ObjectIdGetDatum(RelationGetRelid(relation)));
4450 
4451 	conrel = table_open(ConstraintRelationId, AccessShareLock);
4452 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4453 								 NULL, 1, skey);
4454 
4455 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4456 	{
4457 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
4458 		Datum		val;
4459 		bool		isnull;
4460 		char	   *s;
4461 
4462 		/* We want check constraints only */
4463 		if (conform->contype != CONSTRAINT_CHECK)
4464 			continue;
4465 
4466 		if (found >= ncheck)
4467 			elog(ERROR, "unexpected constraint record found for rel %s",
4468 				 RelationGetRelationName(relation));
4469 
4470 		check[found].ccvalid = conform->convalidated;
4471 		check[found].ccnoinherit = conform->connoinherit;
4472 		check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
4473 												  NameStr(conform->conname));
4474 
4475 		/* Grab and test conbin is actually set */
4476 		val = fastgetattr(htup,
4477 						  Anum_pg_constraint_conbin,
4478 						  conrel->rd_att, &isnull);
4479 		if (isnull)
4480 			elog(ERROR, "null conbin for rel %s",
4481 				 RelationGetRelationName(relation));
4482 
4483 		/* detoast and convert to cstring in caller's context */
4484 		s = TextDatumGetCString(val);
4485 		check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
4486 		pfree(s);
4487 
4488 		found++;
4489 	}
4490 
4491 	systable_endscan(conscan);
4492 	table_close(conrel, AccessShareLock);
4493 
4494 	if (found != ncheck)
4495 		elog(ERROR, "%d constraint record(s) missing for rel %s",
4496 			 ncheck - found, RelationGetRelationName(relation));
4497 
4498 	/* Sort the records so that CHECKs are applied in a deterministic order */
4499 	if (ncheck > 1)
4500 		qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp);
4501 }
4502 
4503 /*
4504  * qsort comparator to sort ConstrCheck entries by name
4505  */
4506 static int
CheckConstraintCmp(const void * a,const void * b)4507 CheckConstraintCmp(const void *a, const void *b)
4508 {
4509 	const ConstrCheck *ca = (const ConstrCheck *) a;
4510 	const ConstrCheck *cb = (const ConstrCheck *) b;
4511 
4512 	return strcmp(ca->ccname, cb->ccname);
4513 }
4514 
4515 /*
4516  * RelationGetFKeyList -- get a list of foreign key info for the relation
4517  *
4518  * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
4519  * the given relation.  This data is a direct copy of relevant fields from
4520  * pg_constraint.  The list items are in no particular order.
4521  *
4522  * CAUTION: the returned list is part of the relcache's data, and could
4523  * vanish in a relcache entry reset.  Callers must inspect or copy it
4524  * before doing anything that might trigger a cache flush, such as
4525  * system catalog accesses.  copyObject() can be used if desired.
4526  * (We define it this way because current callers want to filter and
4527  * modify the list entries anyway, so copying would be a waste of time.)
4528  */
4529 List *
RelationGetFKeyList(Relation relation)4530 RelationGetFKeyList(Relation relation)
4531 {
4532 	List	   *result;
4533 	Relation	conrel;
4534 	SysScanDesc conscan;
4535 	ScanKeyData skey;
4536 	HeapTuple	htup;
4537 	List	   *oldlist;
4538 	MemoryContext oldcxt;
4539 
4540 	/* Quick exit if we already computed the list. */
4541 	if (relation->rd_fkeyvalid)
4542 		return relation->rd_fkeylist;
4543 
4544 	/* Fast path: non-partitioned tables without triggers can't have FKs */
4545 	if (!relation->rd_rel->relhastriggers &&
4546 		relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
4547 		return NIL;
4548 
4549 	/*
4550 	 * We build the list we intend to return (in the caller's context) while
4551 	 * doing the scan.  After successfully completing the scan, we copy that
4552 	 * list into the relcache entry.  This avoids cache-context memory leakage
4553 	 * if we get some sort of error partway through.
4554 	 */
4555 	result = NIL;
4556 
4557 	/* Prepare to scan pg_constraint for entries having conrelid = this rel. */
4558 	ScanKeyInit(&skey,
4559 				Anum_pg_constraint_conrelid,
4560 				BTEqualStrategyNumber, F_OIDEQ,
4561 				ObjectIdGetDatum(RelationGetRelid(relation)));
4562 
4563 	conrel = table_open(ConstraintRelationId, AccessShareLock);
4564 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
4565 								 NULL, 1, &skey);
4566 
4567 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
4568 	{
4569 		Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
4570 		ForeignKeyCacheInfo *info;
4571 
4572 		/* consider only foreign keys */
4573 		if (constraint->contype != CONSTRAINT_FOREIGN)
4574 			continue;
4575 
4576 		info = makeNode(ForeignKeyCacheInfo);
4577 		info->conoid = constraint->oid;
4578 		info->conrelid = constraint->conrelid;
4579 		info->confrelid = constraint->confrelid;
4580 
4581 		DeconstructFkConstraintRow(htup, &info->nkeys,
4582 								   info->conkey,
4583 								   info->confkey,
4584 								   info->conpfeqop,
4585 								   NULL, NULL);
4586 
4587 		/* Add FK's node to the result list */
4588 		result = lappend(result, info);
4589 	}
4590 
4591 	systable_endscan(conscan);
4592 	table_close(conrel, AccessShareLock);
4593 
4594 	/* Now save a copy of the completed list in the relcache entry. */
4595 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4596 	oldlist = relation->rd_fkeylist;
4597 	relation->rd_fkeylist = copyObject(result);
4598 	relation->rd_fkeyvalid = true;
4599 	MemoryContextSwitchTo(oldcxt);
4600 
4601 	/* Don't leak the old list, if there is one */
4602 	list_free_deep(oldlist);
4603 
4604 	return result;
4605 }
4606 
4607 /*
4608  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
4609  *
4610  * The index list is created only if someone requests it.  We scan pg_index
4611  * to find relevant indexes, and add the list to the relcache entry so that
4612  * we won't have to compute it again.  Note that shared cache inval of a
4613  * relcache entry will delete the old list and set rd_indexvalid to false,
4614  * so that we must recompute the index list on next request.  This handles
4615  * creation or deletion of an index.
4616  *
4617  * Indexes that are marked not indislive are omitted from the returned list.
4618  * Such indexes are expected to be dropped momentarily, and should not be
4619  * touched at all by any caller of this function.
4620  *
4621  * The returned list is guaranteed to be sorted in order by OID.  This is
4622  * needed by the executor, since for index types that we obtain exclusive
4623  * locks on when updating the index, all backends must lock the indexes in
4624  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
4625  * consistent ordering would do, but ordering by OID is easy.
4626  *
4627  * Since shared cache inval causes the relcache's copy of the list to go away,
4628  * we return a copy of the list palloc'd in the caller's context.  The caller
4629  * may list_free() the returned list after scanning it. This is necessary
4630  * since the caller will typically be doing syscache lookups on the relevant
4631  * indexes, and syscache lookup could cause SI messages to be processed!
4632  *
4633  * In exactly the same way, we update rd_pkindex, which is the OID of the
4634  * relation's primary key index if any, else InvalidOid; and rd_replidindex,
4635  * which is the pg_class OID of an index to be used as the relation's
4636  * replication identity index, or InvalidOid if there is no such index.
4637  */
4638 List *
RelationGetIndexList(Relation relation)4639 RelationGetIndexList(Relation relation)
4640 {
4641 	Relation	indrel;
4642 	SysScanDesc indscan;
4643 	ScanKeyData skey;
4644 	HeapTuple	htup;
4645 	List	   *result;
4646 	List	   *oldlist;
4647 	char		replident = relation->rd_rel->relreplident;
4648 	Oid			pkeyIndex = InvalidOid;
4649 	Oid			candidateIndex = InvalidOid;
4650 	MemoryContext oldcxt;
4651 
4652 	/* Quick exit if we already computed the list. */
4653 	if (relation->rd_indexvalid)
4654 		return list_copy(relation->rd_indexlist);
4655 
4656 	/*
4657 	 * We build the list we intend to return (in the caller's context) while
4658 	 * doing the scan.  After successfully completing the scan, we copy that
4659 	 * list into the relcache entry.  This avoids cache-context memory leakage
4660 	 * if we get some sort of error partway through.
4661 	 */
4662 	result = NIL;
4663 
4664 	/* Prepare to scan pg_index for entries having indrelid = this rel. */
4665 	ScanKeyInit(&skey,
4666 				Anum_pg_index_indrelid,
4667 				BTEqualStrategyNumber, F_OIDEQ,
4668 				ObjectIdGetDatum(RelationGetRelid(relation)));
4669 
4670 	indrel = table_open(IndexRelationId, AccessShareLock);
4671 	indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
4672 								 NULL, 1, &skey);
4673 
4674 	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4675 	{
4676 		Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
4677 
4678 		/*
4679 		 * Ignore any indexes that are currently being dropped.  This will
4680 		 * prevent them from being searched, inserted into, or considered in
4681 		 * HOT-safety decisions.  It's unsafe to touch such an index at all
4682 		 * since its catalog entries could disappear at any instant.
4683 		 */
4684 		if (!index->indislive)
4685 			continue;
4686 
4687 		/* add index's OID to result list */
4688 		result = lappend_oid(result, index->indexrelid);
4689 
4690 		/*
4691 		 * Invalid, non-unique, non-immediate or predicate indexes aren't
4692 		 * interesting for either oid indexes or replication identity indexes,
4693 		 * so don't check them.
4694 		 */
4695 		if (!index->indisvalid || !index->indisunique ||
4696 			!index->indimmediate ||
4697 			!heap_attisnull(htup, Anum_pg_index_indpred, NULL))
4698 			continue;
4699 
4700 		/* remember primary key index if any */
4701 		if (index->indisprimary)
4702 			pkeyIndex = index->indexrelid;
4703 
4704 		/* remember explicitly chosen replica index */
4705 		if (index->indisreplident)
4706 			candidateIndex = index->indexrelid;
4707 	}
4708 
4709 	systable_endscan(indscan);
4710 
4711 	table_close(indrel, AccessShareLock);
4712 
4713 	/* Sort the result list into OID order, per API spec. */
4714 	list_sort(result, list_oid_cmp);
4715 
4716 	/* Now save a copy of the completed list in the relcache entry. */
4717 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4718 	oldlist = relation->rd_indexlist;
4719 	relation->rd_indexlist = list_copy(result);
4720 	relation->rd_pkindex = pkeyIndex;
4721 	if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
4722 		relation->rd_replidindex = pkeyIndex;
4723 	else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
4724 		relation->rd_replidindex = candidateIndex;
4725 	else
4726 		relation->rd_replidindex = InvalidOid;
4727 	relation->rd_indexvalid = true;
4728 	MemoryContextSwitchTo(oldcxt);
4729 
4730 	/* Don't leak the old list, if there is one */
4731 	list_free(oldlist);
4732 
4733 	return result;
4734 }
4735 
4736 /*
4737  * RelationGetStatExtList
4738  *		get a list of OIDs of statistics objects on this relation
4739  *
4740  * The statistics list is created only if someone requests it, in a way
4741  * similar to RelationGetIndexList().  We scan pg_statistic_ext to find
4742  * relevant statistics, and add the list to the relcache entry so that we
4743  * won't have to compute it again.  Note that shared cache inval of a
4744  * relcache entry will delete the old list and set rd_statvalid to 0,
4745  * so that we must recompute the statistics list on next request.  This
4746  * handles creation or deletion of a statistics object.
4747  *
4748  * The returned list is guaranteed to be sorted in order by OID, although
4749  * this is not currently needed.
4750  *
4751  * Since shared cache inval causes the relcache's copy of the list to go away,
4752  * we return a copy of the list palloc'd in the caller's context.  The caller
4753  * may list_free() the returned list after scanning it. This is necessary
4754  * since the caller will typically be doing syscache lookups on the relevant
4755  * statistics, and syscache lookup could cause SI messages to be processed!
4756  */
4757 List *
RelationGetStatExtList(Relation relation)4758 RelationGetStatExtList(Relation relation)
4759 {
4760 	Relation	indrel;
4761 	SysScanDesc indscan;
4762 	ScanKeyData skey;
4763 	HeapTuple	htup;
4764 	List	   *result;
4765 	List	   *oldlist;
4766 	MemoryContext oldcxt;
4767 
4768 	/* Quick exit if we already computed the list. */
4769 	if (relation->rd_statvalid != 0)
4770 		return list_copy(relation->rd_statlist);
4771 
4772 	/*
4773 	 * We build the list we intend to return (in the caller's context) while
4774 	 * doing the scan.  After successfully completing the scan, we copy that
4775 	 * list into the relcache entry.  This avoids cache-context memory leakage
4776 	 * if we get some sort of error partway through.
4777 	 */
4778 	result = NIL;
4779 
4780 	/*
4781 	 * Prepare to scan pg_statistic_ext for entries having stxrelid = this
4782 	 * rel.
4783 	 */
4784 	ScanKeyInit(&skey,
4785 				Anum_pg_statistic_ext_stxrelid,
4786 				BTEqualStrategyNumber, F_OIDEQ,
4787 				ObjectIdGetDatum(RelationGetRelid(relation)));
4788 
4789 	indrel = table_open(StatisticExtRelationId, AccessShareLock);
4790 	indscan = systable_beginscan(indrel, StatisticExtRelidIndexId, true,
4791 								 NULL, 1, &skey);
4792 
4793 	while (HeapTupleIsValid(htup = systable_getnext(indscan)))
4794 	{
4795 		Oid			oid = ((Form_pg_statistic_ext) GETSTRUCT(htup))->oid;
4796 
4797 		result = lappend_oid(result, oid);
4798 	}
4799 
4800 	systable_endscan(indscan);
4801 
4802 	table_close(indrel, AccessShareLock);
4803 
4804 	/* Sort the result list into OID order, per API spec. */
4805 	list_sort(result, list_oid_cmp);
4806 
4807 	/* Now save a copy of the completed list in the relcache entry. */
4808 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
4809 	oldlist = relation->rd_statlist;
4810 	relation->rd_statlist = list_copy(result);
4811 
4812 	relation->rd_statvalid = true;
4813 	MemoryContextSwitchTo(oldcxt);
4814 
4815 	/* Don't leak the old list, if there is one */
4816 	list_free(oldlist);
4817 
4818 	return result;
4819 }
4820 
4821 /*
4822  * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
4823  *
4824  * Returns InvalidOid if there is no such index.
4825  */
4826 Oid
RelationGetPrimaryKeyIndex(Relation relation)4827 RelationGetPrimaryKeyIndex(Relation relation)
4828 {
4829 	List	   *ilist;
4830 
4831 	if (!relation->rd_indexvalid)
4832 	{
4833 		/* RelationGetIndexList does the heavy lifting. */
4834 		ilist = RelationGetIndexList(relation);
4835 		list_free(ilist);
4836 		Assert(relation->rd_indexvalid);
4837 	}
4838 
4839 	return relation->rd_pkindex;
4840 }
4841 
4842 /*
4843  * RelationGetReplicaIndex -- get OID of the relation's replica identity index
4844  *
4845  * Returns InvalidOid if there is no such index.
4846  */
4847 Oid
RelationGetReplicaIndex(Relation relation)4848 RelationGetReplicaIndex(Relation relation)
4849 {
4850 	List	   *ilist;
4851 
4852 	if (!relation->rd_indexvalid)
4853 	{
4854 		/* RelationGetIndexList does the heavy lifting. */
4855 		ilist = RelationGetIndexList(relation);
4856 		list_free(ilist);
4857 		Assert(relation->rd_indexvalid);
4858 	}
4859 
4860 	return relation->rd_replidindex;
4861 }
4862 
4863 /*
4864  * RelationGetIndexExpressions -- get the index expressions for an index
4865  *
4866  * We cache the result of transforming pg_index.indexprs into a node tree.
4867  * If the rel is not an index or has no expressional columns, we return NIL.
4868  * Otherwise, the returned tree is copied into the caller's memory context.
4869  * (We don't want to return a pointer to the relcache copy, since it could
4870  * disappear due to relcache invalidation.)
4871  */
4872 List *
RelationGetIndexExpressions(Relation relation)4873 RelationGetIndexExpressions(Relation relation)
4874 {
4875 	List	   *result;
4876 	Datum		exprsDatum;
4877 	bool		isnull;
4878 	char	   *exprsString;
4879 	MemoryContext oldcxt;
4880 
4881 	/* Quick exit if we already computed the result. */
4882 	if (relation->rd_indexprs)
4883 		return copyObject(relation->rd_indexprs);
4884 
4885 	/* Quick exit if there is nothing to do. */
4886 	if (relation->rd_indextuple == NULL ||
4887 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4888 		return NIL;
4889 
4890 	/*
4891 	 * We build the tree we intend to return in the caller's context. After
4892 	 * successfully completing the work, we copy it into the relcache entry.
4893 	 * This avoids problems if we get some sort of error partway through.
4894 	 */
4895 	exprsDatum = heap_getattr(relation->rd_indextuple,
4896 							  Anum_pg_index_indexprs,
4897 							  GetPgIndexDescriptor(),
4898 							  &isnull);
4899 	Assert(!isnull);
4900 	exprsString = TextDatumGetCString(exprsDatum);
4901 	result = (List *) stringToNode(exprsString);
4902 	pfree(exprsString);
4903 
4904 	/*
4905 	 * Run the expressions through eval_const_expressions. This is not just an
4906 	 * optimization, but is necessary, because the planner will be comparing
4907 	 * them to similarly-processed qual clauses, and may fail to detect valid
4908 	 * matches without this.  We must not use canonicalize_qual, however,
4909 	 * since these aren't qual expressions.
4910 	 */
4911 	result = (List *) eval_const_expressions(NULL, (Node *) result);
4912 
4913 	/* May as well fix opfuncids too */
4914 	fix_opfuncids((Node *) result);
4915 
4916 	/* Now save a copy of the completed tree in the relcache entry. */
4917 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
4918 	relation->rd_indexprs = copyObject(result);
4919 	MemoryContextSwitchTo(oldcxt);
4920 
4921 	return result;
4922 }
4923 
4924 /*
4925  * RelationGetDummyIndexExpressions -- get dummy expressions for an index
4926  *
4927  * Return a list of dummy expressions (just Const nodes) with the same
4928  * types/typmods/collations as the index's real expressions.  This is
4929  * useful in situations where we don't want to run any user-defined code.
4930  */
4931 List *
RelationGetDummyIndexExpressions(Relation relation)4932 RelationGetDummyIndexExpressions(Relation relation)
4933 {
4934 	List	   *result;
4935 	Datum		exprsDatum;
4936 	bool		isnull;
4937 	char	   *exprsString;
4938 	List	   *rawExprs;
4939 	ListCell   *lc;
4940 
4941 	/* Quick exit if there is nothing to do. */
4942 	if (relation->rd_indextuple == NULL ||
4943 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
4944 		return NIL;
4945 
4946 	/* Extract raw node tree(s) from index tuple. */
4947 	exprsDatum = heap_getattr(relation->rd_indextuple,
4948 							  Anum_pg_index_indexprs,
4949 							  GetPgIndexDescriptor(),
4950 							  &isnull);
4951 	Assert(!isnull);
4952 	exprsString = TextDatumGetCString(exprsDatum);
4953 	rawExprs = (List *) stringToNode(exprsString);
4954 	pfree(exprsString);
4955 
4956 	/* Construct null Consts; the typlen and typbyval are arbitrary. */
4957 	result = NIL;
4958 	foreach(lc, rawExprs)
4959 	{
4960 		Node	   *rawExpr = (Node *) lfirst(lc);
4961 
4962 		result = lappend(result,
4963 						 makeConst(exprType(rawExpr),
4964 								   exprTypmod(rawExpr),
4965 								   exprCollation(rawExpr),
4966 								   1,
4967 								   (Datum) 0,
4968 								   true,
4969 								   true));
4970 	}
4971 
4972 	return result;
4973 }
4974 
4975 /*
4976  * RelationGetIndexPredicate -- get the index predicate for an index
4977  *
4978  * We cache the result of transforming pg_index.indpred into an implicit-AND
4979  * node tree (suitable for use in planning).
4980  * If the rel is not an index or has no predicate, we return NIL.
4981  * Otherwise, the returned tree is copied into the caller's memory context.
4982  * (We don't want to return a pointer to the relcache copy, since it could
4983  * disappear due to relcache invalidation.)
4984  */
4985 List *
RelationGetIndexPredicate(Relation relation)4986 RelationGetIndexPredicate(Relation relation)
4987 {
4988 	List	   *result;
4989 	Datum		predDatum;
4990 	bool		isnull;
4991 	char	   *predString;
4992 	MemoryContext oldcxt;
4993 
4994 	/* Quick exit if we already computed the result. */
4995 	if (relation->rd_indpred)
4996 		return copyObject(relation->rd_indpred);
4997 
4998 	/* Quick exit if there is nothing to do. */
4999 	if (relation->rd_indextuple == NULL ||
5000 		heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred, NULL))
5001 		return NIL;
5002 
5003 	/*
5004 	 * We build the tree we intend to return in the caller's context. After
5005 	 * successfully completing the work, we copy it into the relcache entry.
5006 	 * This avoids problems if we get some sort of error partway through.
5007 	 */
5008 	predDatum = heap_getattr(relation->rd_indextuple,
5009 							 Anum_pg_index_indpred,
5010 							 GetPgIndexDescriptor(),
5011 							 &isnull);
5012 	Assert(!isnull);
5013 	predString = TextDatumGetCString(predDatum);
5014 	result = (List *) stringToNode(predString);
5015 	pfree(predString);
5016 
5017 	/*
5018 	 * Run the expression through const-simplification and canonicalization.
5019 	 * This is not just an optimization, but is necessary, because the planner
5020 	 * will be comparing it to similarly-processed qual clauses, and may fail
5021 	 * to detect valid matches without this.  This must match the processing
5022 	 * done to qual clauses in preprocess_expression()!  (We can skip the
5023 	 * stuff involving subqueries, however, since we don't allow any in index
5024 	 * predicates.)
5025 	 */
5026 	result = (List *) eval_const_expressions(NULL, (Node *) result);
5027 
5028 	result = (List *) canonicalize_qual((Expr *) result, false);
5029 
5030 	/* Also convert to implicit-AND format */
5031 	result = make_ands_implicit((Expr *) result);
5032 
5033 	/* May as well fix opfuncids too */
5034 	fix_opfuncids((Node *) result);
5035 
5036 	/* Now save a copy of the completed tree in the relcache entry. */
5037 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
5038 	relation->rd_indpred = copyObject(result);
5039 	MemoryContextSwitchTo(oldcxt);
5040 
5041 	return result;
5042 }
5043 
5044 /*
5045  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
5046  *
5047  * The result has a bit set for each attribute used anywhere in the index
5048  * definitions of all the indexes on this relation.  (This includes not only
5049  * simple index keys, but attributes used in expressions and partial-index
5050  * predicates.)
5051  *
5052  * Depending on attrKind, a bitmap covering the attnums for all index columns,
5053  * for all potential foreign key columns, or for all columns in the configured
5054  * replica identity index is returned.
5055  *
5056  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
5057  * we can include system attributes (e.g., OID) in the bitmap representation.
5058  *
5059  * Caller had better hold at least RowExclusiveLock on the target relation
5060  * to ensure it is safe (deadlock-free) for us to take locks on the relation's
5061  * indexes.  Note that since the introduction of CREATE INDEX CONCURRENTLY,
5062  * that lock level doesn't guarantee a stable set of indexes, so we have to
5063  * be prepared to retry here in case of a change in the set of indexes.
5064  *
5065  * The returned result is palloc'd in the caller's memory context and should
5066  * be bms_free'd when not needed anymore.
5067  */
5068 Bitmapset *
RelationGetIndexAttrBitmap(Relation relation,IndexAttrBitmapKind attrKind)5069 RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
5070 {
5071 	Bitmapset  *indexattrs;		/* indexed columns */
5072 	Bitmapset  *uindexattrs;	/* columns in unique indexes */
5073 	Bitmapset  *pkindexattrs;	/* columns in the primary index */
5074 	Bitmapset  *idindexattrs;	/* columns in the replica identity */
5075 	List	   *indexoidlist;
5076 	List	   *newindexoidlist;
5077 	Oid			relpkindex;
5078 	Oid			relreplindex;
5079 	ListCell   *l;
5080 	MemoryContext oldcxt;
5081 
5082 	/* Quick exit if we already computed the result. */
5083 	if (relation->rd_indexattr != NULL)
5084 	{
5085 		switch (attrKind)
5086 		{
5087 			case INDEX_ATTR_BITMAP_ALL:
5088 				return bms_copy(relation->rd_indexattr);
5089 			case INDEX_ATTR_BITMAP_KEY:
5090 				return bms_copy(relation->rd_keyattr);
5091 			case INDEX_ATTR_BITMAP_PRIMARY_KEY:
5092 				return bms_copy(relation->rd_pkattr);
5093 			case INDEX_ATTR_BITMAP_IDENTITY_KEY:
5094 				return bms_copy(relation->rd_idattr);
5095 			default:
5096 				elog(ERROR, "unknown attrKind %u", attrKind);
5097 		}
5098 	}
5099 
5100 	/* Fast path if definitely no indexes */
5101 	if (!RelationGetForm(relation)->relhasindex)
5102 		return NULL;
5103 
5104 	/*
5105 	 * Get cached list of index OIDs. If we have to start over, we do so here.
5106 	 */
5107 restart:
5108 	indexoidlist = RelationGetIndexList(relation);
5109 
5110 	/* Fall out if no indexes (but relhasindex was set) */
5111 	if (indexoidlist == NIL)
5112 		return NULL;
5113 
5114 	/*
5115 	 * Copy the rd_pkindex and rd_replidindex values computed by
5116 	 * RelationGetIndexList before proceeding.  This is needed because a
5117 	 * relcache flush could occur inside index_open below, resetting the
5118 	 * fields managed by RelationGetIndexList.  We need to do the work with
5119 	 * stable values of these fields.
5120 	 */
5121 	relpkindex = relation->rd_pkindex;
5122 	relreplindex = relation->rd_replidindex;
5123 
5124 	/*
5125 	 * For each index, add referenced attributes to indexattrs.
5126 	 *
5127 	 * Note: we consider all indexes returned by RelationGetIndexList, even if
5128 	 * they are not indisready or indisvalid.  This is important because an
5129 	 * index for which CREATE INDEX CONCURRENTLY has just started must be
5130 	 * included in HOT-safety decisions (see README.HOT).  If a DROP INDEX
5131 	 * CONCURRENTLY is far enough along that we should ignore the index, it
5132 	 * won't be returned at all by RelationGetIndexList.
5133 	 */
5134 	indexattrs = NULL;
5135 	uindexattrs = NULL;
5136 	pkindexattrs = NULL;
5137 	idindexattrs = NULL;
5138 	foreach(l, indexoidlist)
5139 	{
5140 		Oid			indexOid = lfirst_oid(l);
5141 		Relation	indexDesc;
5142 		Datum		datum;
5143 		bool		isnull;
5144 		Node	   *indexExpressions;
5145 		Node	   *indexPredicate;
5146 		int			i;
5147 		bool		isKey;		/* candidate key */
5148 		bool		isPK;		/* primary key */
5149 		bool		isIDKey;	/* replica identity index */
5150 
5151 		indexDesc = index_open(indexOid, AccessShareLock);
5152 
5153 		/*
5154 		 * Extract index expressions and index predicate.  Note: Don't use
5155 		 * RelationGetIndexExpressions()/RelationGetIndexPredicate(), because
5156 		 * those might run constant expressions evaluation, which needs a
5157 		 * snapshot, which we might not have here.  (Also, it's probably more
5158 		 * sound to collect the bitmaps before any transformations that might
5159 		 * eliminate columns, but the practical impact of this is limited.)
5160 		 */
5161 
5162 		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indexprs,
5163 							 GetPgIndexDescriptor(), &isnull);
5164 		if (!isnull)
5165 			indexExpressions = stringToNode(TextDatumGetCString(datum));
5166 		else
5167 			indexExpressions = NULL;
5168 
5169 		datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indpred,
5170 							 GetPgIndexDescriptor(), &isnull);
5171 		if (!isnull)
5172 			indexPredicate = stringToNode(TextDatumGetCString(datum));
5173 		else
5174 			indexPredicate = NULL;
5175 
5176 		/* Can this index be referenced by a foreign key? */
5177 		isKey = indexDesc->rd_index->indisunique &&
5178 			indexExpressions == NULL &&
5179 			indexPredicate == NULL;
5180 
5181 		/* Is this a primary key? */
5182 		isPK = (indexOid == relpkindex);
5183 
5184 		/* Is this index the configured (or default) replica identity? */
5185 		isIDKey = (indexOid == relreplindex);
5186 
5187 		/* Collect simple attribute references */
5188 		for (i = 0; i < indexDesc->rd_index->indnatts; i++)
5189 		{
5190 			int			attrnum = indexDesc->rd_index->indkey.values[i];
5191 
5192 			/*
5193 			 * Since we have covering indexes with non-key columns, we must
5194 			 * handle them accurately here. non-key columns must be added into
5195 			 * indexattrs, since they are in index, and HOT-update shouldn't
5196 			 * miss them. Obviously, non-key columns couldn't be referenced by
5197 			 * foreign key or identity key. Hence we do not include them into
5198 			 * uindexattrs, pkindexattrs and idindexattrs bitmaps.
5199 			 */
5200 			if (attrnum != 0)
5201 			{
5202 				indexattrs = bms_add_member(indexattrs,
5203 											attrnum - FirstLowInvalidHeapAttributeNumber);
5204 
5205 				if (isKey && i < indexDesc->rd_index->indnkeyatts)
5206 					uindexattrs = bms_add_member(uindexattrs,
5207 												 attrnum - FirstLowInvalidHeapAttributeNumber);
5208 
5209 				if (isPK && i < indexDesc->rd_index->indnkeyatts)
5210 					pkindexattrs = bms_add_member(pkindexattrs,
5211 												  attrnum - FirstLowInvalidHeapAttributeNumber);
5212 
5213 				if (isIDKey && i < indexDesc->rd_index->indnkeyatts)
5214 					idindexattrs = bms_add_member(idindexattrs,
5215 												  attrnum - FirstLowInvalidHeapAttributeNumber);
5216 			}
5217 		}
5218 
5219 		/* Collect all attributes used in expressions, too */
5220 		pull_varattnos(indexExpressions, 1, &indexattrs);
5221 
5222 		/* Collect all attributes in the index predicate, too */
5223 		pull_varattnos(indexPredicate, 1, &indexattrs);
5224 
5225 		index_close(indexDesc, AccessShareLock);
5226 	}
5227 
5228 	/*
5229 	 * During one of the index_opens in the above loop, we might have received
5230 	 * a relcache flush event on this relcache entry, which might have been
5231 	 * signaling a change in the rel's index list.  If so, we'd better start
5232 	 * over to ensure we deliver up-to-date attribute bitmaps.
5233 	 */
5234 	newindexoidlist = RelationGetIndexList(relation);
5235 	if (equal(indexoidlist, newindexoidlist) &&
5236 		relpkindex == relation->rd_pkindex &&
5237 		relreplindex == relation->rd_replidindex)
5238 	{
5239 		/* Still the same index set, so proceed */
5240 		list_free(newindexoidlist);
5241 		list_free(indexoidlist);
5242 	}
5243 	else
5244 	{
5245 		/* Gotta do it over ... might as well not leak memory */
5246 		list_free(newindexoidlist);
5247 		list_free(indexoidlist);
5248 		bms_free(uindexattrs);
5249 		bms_free(pkindexattrs);
5250 		bms_free(idindexattrs);
5251 		bms_free(indexattrs);
5252 
5253 		goto restart;
5254 	}
5255 
5256 	/* Don't leak the old values of these bitmaps, if any */
5257 	bms_free(relation->rd_indexattr);
5258 	relation->rd_indexattr = NULL;
5259 	bms_free(relation->rd_keyattr);
5260 	relation->rd_keyattr = NULL;
5261 	bms_free(relation->rd_pkattr);
5262 	relation->rd_pkattr = NULL;
5263 	bms_free(relation->rd_idattr);
5264 	relation->rd_idattr = NULL;
5265 
5266 	/*
5267 	 * Now save copies of the bitmaps in the relcache entry.  We intentionally
5268 	 * set rd_indexattr last, because that's the one that signals validity of
5269 	 * the values; if we run out of memory before making that copy, we won't
5270 	 * leave the relcache entry looking like the other ones are valid but
5271 	 * empty.
5272 	 */
5273 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5274 	relation->rd_keyattr = bms_copy(uindexattrs);
5275 	relation->rd_pkattr = bms_copy(pkindexattrs);
5276 	relation->rd_idattr = bms_copy(idindexattrs);
5277 	relation->rd_indexattr = bms_copy(indexattrs);
5278 	MemoryContextSwitchTo(oldcxt);
5279 
5280 	/* We return our original working copy for caller to play with */
5281 	switch (attrKind)
5282 	{
5283 		case INDEX_ATTR_BITMAP_ALL:
5284 			return indexattrs;
5285 		case INDEX_ATTR_BITMAP_KEY:
5286 			return uindexattrs;
5287 		case INDEX_ATTR_BITMAP_PRIMARY_KEY:
5288 			return pkindexattrs;
5289 		case INDEX_ATTR_BITMAP_IDENTITY_KEY:
5290 			return idindexattrs;
5291 		default:
5292 			elog(ERROR, "unknown attrKind %u", attrKind);
5293 			return NULL;
5294 	}
5295 }
5296 
5297 /*
5298  * RelationGetExclusionInfo -- get info about index's exclusion constraint
5299  *
5300  * This should be called only for an index that is known to have an
5301  * associated exclusion constraint.  It returns arrays (palloc'd in caller's
5302  * context) of the exclusion operator OIDs, their underlying functions'
5303  * OIDs, and their strategy numbers in the index's opclasses.  We cache
5304  * all this information since it requires a fair amount of work to get.
5305  */
5306 void
RelationGetExclusionInfo(Relation indexRelation,Oid ** operators,Oid ** procs,uint16 ** strategies)5307 RelationGetExclusionInfo(Relation indexRelation,
5308 						 Oid **operators,
5309 						 Oid **procs,
5310 						 uint16 **strategies)
5311 {
5312 	int			indnkeyatts;
5313 	Oid		   *ops;
5314 	Oid		   *funcs;
5315 	uint16	   *strats;
5316 	Relation	conrel;
5317 	SysScanDesc conscan;
5318 	ScanKeyData skey[1];
5319 	HeapTuple	htup;
5320 	bool		found;
5321 	MemoryContext oldcxt;
5322 	int			i;
5323 
5324 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
5325 
5326 	/* Allocate result space in caller context */
5327 	*operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5328 	*procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5329 	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
5330 
5331 	/* Quick exit if we have the data cached already */
5332 	if (indexRelation->rd_exclstrats != NULL)
5333 	{
5334 		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
5335 		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
5336 		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
5337 		return;
5338 	}
5339 
5340 	/*
5341 	 * Search pg_constraint for the constraint associated with the index. To
5342 	 * make this not too painfully slow, we use the index on conrelid; that
5343 	 * will hold the parent relation's OID not the index's own OID.
5344 	 *
5345 	 * Note: if we wanted to rely on the constraint name matching the index's
5346 	 * name, we could just do a direct lookup using pg_constraint's unique
5347 	 * index.  For the moment it doesn't seem worth requiring that.
5348 	 */
5349 	ScanKeyInit(&skey[0],
5350 				Anum_pg_constraint_conrelid,
5351 				BTEqualStrategyNumber, F_OIDEQ,
5352 				ObjectIdGetDatum(indexRelation->rd_index->indrelid));
5353 
5354 	conrel = table_open(ConstraintRelationId, AccessShareLock);
5355 	conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
5356 								 NULL, 1, skey);
5357 	found = false;
5358 
5359 	while (HeapTupleIsValid(htup = systable_getnext(conscan)))
5360 	{
5361 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
5362 		Datum		val;
5363 		bool		isnull;
5364 		ArrayType  *arr;
5365 		int			nelem;
5366 
5367 		/* We want the exclusion constraint owning the index */
5368 		if (conform->contype != CONSTRAINT_EXCLUSION ||
5369 			conform->conindid != RelationGetRelid(indexRelation))
5370 			continue;
5371 
5372 		/* There should be only one */
5373 		if (found)
5374 			elog(ERROR, "unexpected exclusion constraint record found for rel %s",
5375 				 RelationGetRelationName(indexRelation));
5376 		found = true;
5377 
5378 		/* Extract the operator OIDS from conexclop */
5379 		val = fastgetattr(htup,
5380 						  Anum_pg_constraint_conexclop,
5381 						  conrel->rd_att, &isnull);
5382 		if (isnull)
5383 			elog(ERROR, "null conexclop for rel %s",
5384 				 RelationGetRelationName(indexRelation));
5385 
5386 		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
5387 		nelem = ARR_DIMS(arr)[0];
5388 		if (ARR_NDIM(arr) != 1 ||
5389 			nelem != indnkeyatts ||
5390 			ARR_HASNULL(arr) ||
5391 			ARR_ELEMTYPE(arr) != OIDOID)
5392 			elog(ERROR, "conexclop is not a 1-D Oid array");
5393 
5394 		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
5395 	}
5396 
5397 	systable_endscan(conscan);
5398 	table_close(conrel, AccessShareLock);
5399 
5400 	if (!found)
5401 		elog(ERROR, "exclusion constraint record missing for rel %s",
5402 			 RelationGetRelationName(indexRelation));
5403 
5404 	/* We need the func OIDs and strategy numbers too */
5405 	for (i = 0; i < indnkeyatts; i++)
5406 	{
5407 		funcs[i] = get_opcode(ops[i]);
5408 		strats[i] = get_op_opfamily_strategy(ops[i],
5409 											 indexRelation->rd_opfamily[i]);
5410 		/* shouldn't fail, since it was checked at index creation */
5411 		if (strats[i] == InvalidStrategy)
5412 			elog(ERROR, "could not find strategy for operator %u in family %u",
5413 				 ops[i], indexRelation->rd_opfamily[i]);
5414 	}
5415 
5416 	/* Save a copy of the results in the relcache entry. */
5417 	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
5418 	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5419 	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
5420 	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
5421 	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
5422 	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
5423 	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
5424 	MemoryContextSwitchTo(oldcxt);
5425 }
5426 
5427 /*
5428  * Get publication actions for the given relation.
5429  */
5430 struct PublicationActions *
GetRelationPublicationActions(Relation relation)5431 GetRelationPublicationActions(Relation relation)
5432 {
5433 	List	   *puboids;
5434 	ListCell   *lc;
5435 	MemoryContext oldcxt;
5436 	PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
5437 
5438 	/*
5439 	 * If not publishable, it publishes no actions.  (pgoutput_change() will
5440 	 * ignore it.)
5441 	 */
5442 	if (!is_publishable_relation(relation))
5443 		return pubactions;
5444 
5445 	if (relation->rd_pubactions)
5446 		return memcpy(pubactions, relation->rd_pubactions,
5447 					  sizeof(PublicationActions));
5448 
5449 	/* Fetch the publication membership info. */
5450 	puboids = GetRelationPublications(RelationGetRelid(relation));
5451 	if (relation->rd_rel->relispartition)
5452 	{
5453 		/* Add publications that the ancestors are in too. */
5454 		List	   *ancestors = get_partition_ancestors(RelationGetRelid(relation));
5455 		ListCell   *lc;
5456 
5457 		foreach(lc, ancestors)
5458 		{
5459 			Oid			ancestor = lfirst_oid(lc);
5460 
5461 			puboids = list_concat_unique_oid(puboids,
5462 											 GetRelationPublications(ancestor));
5463 		}
5464 	}
5465 	puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
5466 
5467 	foreach(lc, puboids)
5468 	{
5469 		Oid			pubid = lfirst_oid(lc);
5470 		HeapTuple	tup;
5471 		Form_pg_publication pubform;
5472 
5473 		tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
5474 
5475 		if (!HeapTupleIsValid(tup))
5476 			elog(ERROR, "cache lookup failed for publication %u", pubid);
5477 
5478 		pubform = (Form_pg_publication) GETSTRUCT(tup);
5479 
5480 		pubactions->pubinsert |= pubform->pubinsert;
5481 		pubactions->pubupdate |= pubform->pubupdate;
5482 		pubactions->pubdelete |= pubform->pubdelete;
5483 		pubactions->pubtruncate |= pubform->pubtruncate;
5484 
5485 		ReleaseSysCache(tup);
5486 
5487 		/*
5488 		 * If we know everything is replicated, there is no point to check for
5489 		 * other publications.
5490 		 */
5491 		if (pubactions->pubinsert && pubactions->pubupdate &&
5492 			pubactions->pubdelete && pubactions->pubtruncate)
5493 			break;
5494 	}
5495 
5496 	if (relation->rd_pubactions)
5497 	{
5498 		pfree(relation->rd_pubactions);
5499 		relation->rd_pubactions = NULL;
5500 	}
5501 
5502 	/* Now save copy of the actions in the relcache entry. */
5503 	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
5504 	relation->rd_pubactions = palloc(sizeof(PublicationActions));
5505 	memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
5506 	MemoryContextSwitchTo(oldcxt);
5507 
5508 	return pubactions;
5509 }
5510 
5511 /*
5512  * RelationGetIndexRawAttOptions -- get AM/opclass-specific options for the index
5513  */
5514 Datum *
RelationGetIndexRawAttOptions(Relation indexrel)5515 RelationGetIndexRawAttOptions(Relation indexrel)
5516 {
5517 	Oid			indexrelid = RelationGetRelid(indexrel);
5518 	int16		natts = RelationGetNumberOfAttributes(indexrel);
5519 	Datum	   *options = NULL;
5520 	int16		attnum;
5521 
5522 	for (attnum = 1; attnum <= natts; attnum++)
5523 	{
5524 		if (indexrel->rd_indam->amoptsprocnum == 0)
5525 			continue;
5526 
5527 		if (!OidIsValid(index_getprocid(indexrel, attnum,
5528 										indexrel->rd_indam->amoptsprocnum)))
5529 			continue;
5530 
5531 		if (!options)
5532 			options = palloc0(sizeof(Datum) * natts);
5533 
5534 		options[attnum - 1] = get_attoptions(indexrelid, attnum);
5535 	}
5536 
5537 	return options;
5538 }
5539 
5540 static bytea **
CopyIndexAttOptions(bytea ** srcopts,int natts)5541 CopyIndexAttOptions(bytea **srcopts, int natts)
5542 {
5543 	bytea	  **opts = palloc(sizeof(*opts) * natts);
5544 
5545 	for (int i = 0; i < natts; i++)
5546 	{
5547 		bytea	   *opt = srcopts[i];
5548 
5549 		opts[i] = !opt ? NULL : (bytea *)
5550 			DatumGetPointer(datumCopy(PointerGetDatum(opt), false, -1));
5551 	}
5552 
5553 	return opts;
5554 }
5555 
5556 /*
5557  * RelationGetIndexAttOptions
5558  *		get AM/opclass-specific options for an index parsed into a binary form
5559  */
5560 bytea	  **
RelationGetIndexAttOptions(Relation relation,bool copy)5561 RelationGetIndexAttOptions(Relation relation, bool copy)
5562 {
5563 	MemoryContext oldcxt;
5564 	bytea	  **opts = relation->rd_opcoptions;
5565 	Oid			relid = RelationGetRelid(relation);
5566 	int			natts = RelationGetNumberOfAttributes(relation);	/* XXX
5567 																	 * IndexRelationGetNumberOfKeyAttributes */
5568 	int			i;
5569 
5570 	/* Try to copy cached options. */
5571 	if (opts)
5572 		return copy ? CopyIndexAttOptions(opts, natts) : opts;
5573 
5574 	/* Get and parse opclass options. */
5575 	opts = palloc0(sizeof(*opts) * natts);
5576 
5577 	for (i = 0; i < natts; i++)
5578 	{
5579 		if (criticalRelcachesBuilt && relid != AttributeRelidNumIndexId)
5580 		{
5581 			Datum		attoptions = get_attoptions(relid, i + 1);
5582 
5583 			opts[i] = index_opclass_options(relation, i + 1, attoptions, false);
5584 
5585 			if (attoptions != (Datum) 0)
5586 				pfree(DatumGetPointer(attoptions));
5587 		}
5588 	}
5589 
5590 	/* Copy parsed options to the cache. */
5591 	oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
5592 	relation->rd_opcoptions = CopyIndexAttOptions(opts, natts);
5593 	MemoryContextSwitchTo(oldcxt);
5594 
5595 	if (copy)
5596 		return opts;
5597 
5598 	for (i = 0; i < natts; i++)
5599 	{
5600 		if (opts[i])
5601 			pfree(opts[i]);
5602 	}
5603 
5604 	pfree(opts);
5605 
5606 	return relation->rd_opcoptions;
5607 }
5608 
5609 /*
5610  * Routines to support ereport() reports of relation-related errors
5611  *
5612  * These could have been put into elog.c, but it seems like a module layering
5613  * violation to have elog.c calling relcache or syscache stuff --- and we
5614  * definitely don't want elog.h including rel.h.  So we put them here.
5615  */
5616 
5617 /*
5618  * errtable --- stores schema_name and table_name of a table
5619  * within the current errordata.
5620  */
5621 int
errtable(Relation rel)5622 errtable(Relation rel)
5623 {
5624 	err_generic_string(PG_DIAG_SCHEMA_NAME,
5625 					   get_namespace_name(RelationGetNamespace(rel)));
5626 	err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
5627 
5628 	return 0;					/* return value does not matter */
5629 }
5630 
5631 /*
5632  * errtablecol --- stores schema_name, table_name and column_name
5633  * of a table column within the current errordata.
5634  *
5635  * The column is specified by attribute number --- for most callers, this is
5636  * easier and less error-prone than getting the column name for themselves.
5637  */
5638 int
errtablecol(Relation rel,int attnum)5639 errtablecol(Relation rel, int attnum)
5640 {
5641 	TupleDesc	reldesc = RelationGetDescr(rel);
5642 	const char *colname;
5643 
5644 	/* Use reldesc if it's a user attribute, else consult the catalogs */
5645 	if (attnum > 0 && attnum <= reldesc->natts)
5646 		colname = NameStr(TupleDescAttr(reldesc, attnum - 1)->attname);
5647 	else
5648 		colname = get_attname(RelationGetRelid(rel), attnum, false);
5649 
5650 	return errtablecolname(rel, colname);
5651 }
5652 
5653 /*
5654  * errtablecolname --- stores schema_name, table_name and column_name
5655  * of a table column within the current errordata, where the column name is
5656  * given directly rather than extracted from the relation's catalog data.
5657  *
5658  * Don't use this directly unless errtablecol() is inconvenient for some
5659  * reason.  This might possibly be needed during intermediate states in ALTER
5660  * TABLE, for instance.
5661  */
5662 int
errtablecolname(Relation rel,const char * colname)5663 errtablecolname(Relation rel, const char *colname)
5664 {
5665 	errtable(rel);
5666 	err_generic_string(PG_DIAG_COLUMN_NAME, colname);
5667 
5668 	return 0;					/* return value does not matter */
5669 }
5670 
5671 /*
5672  * errtableconstraint --- stores schema_name, table_name and constraint_name
5673  * of a table-related constraint within the current errordata.
5674  */
5675 int
errtableconstraint(Relation rel,const char * conname)5676 errtableconstraint(Relation rel, const char *conname)
5677 {
5678 	errtable(rel);
5679 	err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
5680 
5681 	return 0;					/* return value does not matter */
5682 }
5683 
5684 
5685 /*
5686  *	load_relcache_init_file, write_relcache_init_file
5687  *
5688  *		In late 1992, we started regularly having databases with more than
5689  *		a thousand classes in them.  With this number of classes, it became
5690  *		critical to do indexed lookups on the system catalogs.
5691  *
5692  *		Bootstrapping these lookups is very hard.  We want to be able to
5693  *		use an index on pg_attribute, for example, but in order to do so,
5694  *		we must have read pg_attribute for the attributes in the index,
5695  *		which implies that we need to use the index.
5696  *
5697  *		In order to get around the problem, we do the following:
5698  *
5699  *		   +  When the database system is initialized (at initdb time), we
5700  *			  don't use indexes.  We do sequential scans.
5701  *
5702  *		   +  When the backend is started up in normal mode, we load an image
5703  *			  of the appropriate relation descriptors, in internal format,
5704  *			  from an initialization file in the data/base/... directory.
5705  *
5706  *		   +  If the initialization file isn't there, then we create the
5707  *			  relation descriptors using sequential scans and write 'em to
5708  *			  the initialization file for use by subsequent backends.
5709  *
5710  *		As of Postgres 9.0, there is one local initialization file in each
5711  *		database, plus one shared initialization file for shared catalogs.
5712  *
5713  *		We could dispense with the initialization files and just build the
5714  *		critical reldescs the hard way on every backend startup, but that
5715  *		slows down backend startup noticeably.
5716  *
5717  *		We can in fact go further, and save more relcache entries than
5718  *		just the ones that are absolutely critical; this allows us to speed
5719  *		up backend startup by not having to build such entries the hard way.
5720  *		Presently, all the catalog and index entries that are referred to
5721  *		by catcaches are stored in the initialization files.
5722  *
5723  *		The same mechanism that detects when catcache and relcache entries
5724  *		need to be invalidated (due to catalog updates) also arranges to
5725  *		unlink the initialization files when the contents may be out of date.
5726  *		The files will then be rebuilt during the next backend startup.
5727  */
5728 
5729 /*
5730  * load_relcache_init_file -- attempt to load cache from the shared
5731  * or local cache init file
5732  *
5733  * If successful, return true and set criticalRelcachesBuilt or
5734  * criticalSharedRelcachesBuilt to true.
5735  * If not successful, return false.
5736  *
5737  * NOTE: we assume we are already switched into CacheMemoryContext.
5738  */
5739 static bool
load_relcache_init_file(bool shared)5740 load_relcache_init_file(bool shared)
5741 {
5742 	FILE	   *fp;
5743 	char		initfilename[MAXPGPATH];
5744 	Relation   *rels;
5745 	int			relno,
5746 				num_rels,
5747 				max_rels,
5748 				nailed_rels,
5749 				nailed_indexes,
5750 				magic;
5751 	int			i;
5752 
5753 	if (shared)
5754 		snprintf(initfilename, sizeof(initfilename), "global/%s",
5755 				 RELCACHE_INIT_FILENAME);
5756 	else
5757 		snprintf(initfilename, sizeof(initfilename), "%s/%s",
5758 				 DatabasePath, RELCACHE_INIT_FILENAME);
5759 
5760 	fp = AllocateFile(initfilename, PG_BINARY_R);
5761 	if (fp == NULL)
5762 		return false;
5763 
5764 	/*
5765 	 * Read the index relcache entries from the file.  Note we will not enter
5766 	 * any of them into the cache if the read fails partway through; this
5767 	 * helps to guard against broken init files.
5768 	 */
5769 	max_rels = 100;
5770 	rels = (Relation *) palloc(max_rels * sizeof(Relation));
5771 	num_rels = 0;
5772 	nailed_rels = nailed_indexes = 0;
5773 
5774 	/* check for correct magic number (compatible version) */
5775 	if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
5776 		goto read_failed;
5777 	if (magic != RELCACHE_INIT_FILEMAGIC)
5778 		goto read_failed;
5779 
5780 	for (relno = 0;; relno++)
5781 	{
5782 		Size		len;
5783 		size_t		nread;
5784 		Relation	rel;
5785 		Form_pg_class relform;
5786 		bool		has_not_null;
5787 
5788 		/* first read the relation descriptor length */
5789 		nread = fread(&len, 1, sizeof(len), fp);
5790 		if (nread != sizeof(len))
5791 		{
5792 			if (nread == 0)
5793 				break;			/* end of file */
5794 			goto read_failed;
5795 		}
5796 
5797 		/* safety check for incompatible relcache layout */
5798 		if (len != sizeof(RelationData))
5799 			goto read_failed;
5800 
5801 		/* allocate another relcache header */
5802 		if (num_rels >= max_rels)
5803 		{
5804 			max_rels *= 2;
5805 			rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
5806 		}
5807 
5808 		rel = rels[num_rels++] = (Relation) palloc(len);
5809 
5810 		/* then, read the Relation structure */
5811 		if (fread(rel, 1, len, fp) != len)
5812 			goto read_failed;
5813 
5814 		/* next read the relation tuple form */
5815 		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5816 			goto read_failed;
5817 
5818 		relform = (Form_pg_class) palloc(len);
5819 		if (fread(relform, 1, len, fp) != len)
5820 			goto read_failed;
5821 
5822 		rel->rd_rel = relform;
5823 
5824 		/* initialize attribute tuple forms */
5825 		rel->rd_att = CreateTemplateTupleDesc(relform->relnatts);
5826 		rel->rd_att->tdrefcount = 1;	/* mark as refcounted */
5827 
5828 		rel->rd_att->tdtypeid = relform->reltype;
5829 		rel->rd_att->tdtypmod = -1; /* unnecessary, but... */
5830 
5831 		/* next read all the attribute tuple form data entries */
5832 		has_not_null = false;
5833 		for (i = 0; i < relform->relnatts; i++)
5834 		{
5835 			Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i);
5836 
5837 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5838 				goto read_failed;
5839 			if (len != ATTRIBUTE_FIXED_PART_SIZE)
5840 				goto read_failed;
5841 			if (fread(attr, 1, len, fp) != len)
5842 				goto read_failed;
5843 
5844 			has_not_null |= attr->attnotnull;
5845 		}
5846 
5847 		/* next read the access method specific field */
5848 		if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5849 			goto read_failed;
5850 		if (len > 0)
5851 		{
5852 			rel->rd_options = palloc(len);
5853 			if (fread(rel->rd_options, 1, len, fp) != len)
5854 				goto read_failed;
5855 			if (len != VARSIZE(rel->rd_options))
5856 				goto read_failed;	/* sanity check */
5857 		}
5858 		else
5859 		{
5860 			rel->rd_options = NULL;
5861 		}
5862 
5863 		/* mark not-null status */
5864 		if (has_not_null)
5865 		{
5866 			TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
5867 
5868 			constr->has_not_null = true;
5869 			rel->rd_att->constr = constr;
5870 		}
5871 
5872 		/*
5873 		 * If it's an index, there's more to do.  Note we explicitly ignore
5874 		 * partitioned indexes here.
5875 		 */
5876 		if (rel->rd_rel->relkind == RELKIND_INDEX)
5877 		{
5878 			MemoryContext indexcxt;
5879 			Oid		   *opfamily;
5880 			Oid		   *opcintype;
5881 			RegProcedure *support;
5882 			int			nsupport;
5883 			int16	   *indoption;
5884 			Oid		   *indcollation;
5885 
5886 			/* Count nailed indexes to ensure we have 'em all */
5887 			if (rel->rd_isnailed)
5888 				nailed_indexes++;
5889 
5890 			/* next, read the pg_index tuple */
5891 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5892 				goto read_failed;
5893 
5894 			rel->rd_indextuple = (HeapTuple) palloc(len);
5895 			if (fread(rel->rd_indextuple, 1, len, fp) != len)
5896 				goto read_failed;
5897 
5898 			/* Fix up internal pointers in the tuple -- see heap_copytuple */
5899 			rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
5900 			rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
5901 
5902 			/*
5903 			 * prepare index info context --- parameters should match
5904 			 * RelationInitIndexAccessInfo
5905 			 */
5906 			indexcxt = AllocSetContextCreate(CacheMemoryContext,
5907 											 "index info",
5908 											 ALLOCSET_SMALL_SIZES);
5909 			rel->rd_indexcxt = indexcxt;
5910 			MemoryContextCopyAndSetIdentifier(indexcxt,
5911 											  RelationGetRelationName(rel));
5912 
5913 			/*
5914 			 * Now we can fetch the index AM's API struct.  (We can't store
5915 			 * that in the init file, since it contains function pointers that
5916 			 * might vary across server executions.  Fortunately, it should be
5917 			 * safe to call the amhandler even while bootstrapping indexes.)
5918 			 */
5919 			InitIndexAmRoutine(rel);
5920 
5921 			/* next, read the vector of opfamily OIDs */
5922 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5923 				goto read_failed;
5924 
5925 			opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
5926 			if (fread(opfamily, 1, len, fp) != len)
5927 				goto read_failed;
5928 
5929 			rel->rd_opfamily = opfamily;
5930 
5931 			/* next, read the vector of opcintype OIDs */
5932 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5933 				goto read_failed;
5934 
5935 			opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
5936 			if (fread(opcintype, 1, len, fp) != len)
5937 				goto read_failed;
5938 
5939 			rel->rd_opcintype = opcintype;
5940 
5941 			/* next, read the vector of support procedure OIDs */
5942 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5943 				goto read_failed;
5944 			support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
5945 			if (fread(support, 1, len, fp) != len)
5946 				goto read_failed;
5947 
5948 			rel->rd_support = support;
5949 
5950 			/* next, read the vector of collation OIDs */
5951 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5952 				goto read_failed;
5953 
5954 			indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
5955 			if (fread(indcollation, 1, len, fp) != len)
5956 				goto read_failed;
5957 
5958 			rel->rd_indcollation = indcollation;
5959 
5960 			/* finally, read the vector of indoption values */
5961 			if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5962 				goto read_failed;
5963 
5964 			indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
5965 			if (fread(indoption, 1, len, fp) != len)
5966 				goto read_failed;
5967 
5968 			rel->rd_indoption = indoption;
5969 
5970 			/* finally, read the vector of opcoptions values */
5971 			rel->rd_opcoptions = (bytea **)
5972 				MemoryContextAllocZero(indexcxt, sizeof(*rel->rd_opcoptions) * relform->relnatts);
5973 
5974 			for (i = 0; i < relform->relnatts; i++)
5975 			{
5976 				if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
5977 					goto read_failed;
5978 
5979 				if (len > 0)
5980 				{
5981 					rel->rd_opcoptions[i] = (bytea *) MemoryContextAlloc(indexcxt, len);
5982 					if (fread(rel->rd_opcoptions[i], 1, len, fp) != len)
5983 						goto read_failed;
5984 				}
5985 			}
5986 
5987 			/* set up zeroed fmgr-info vector */
5988 			nsupport = relform->relnatts * rel->rd_indam->amsupport;
5989 			rel->rd_supportinfo = (FmgrInfo *)
5990 				MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
5991 		}
5992 		else
5993 		{
5994 			/* Count nailed rels to ensure we have 'em all */
5995 			if (rel->rd_isnailed)
5996 				nailed_rels++;
5997 
5998 			/* Load table AM data */
5999 			if (rel->rd_rel->relkind == RELKIND_RELATION ||
6000 				rel->rd_rel->relkind == RELKIND_SEQUENCE ||
6001 				rel->rd_rel->relkind == RELKIND_TOASTVALUE ||
6002 				rel->rd_rel->relkind == RELKIND_MATVIEW)
6003 				RelationInitTableAccessMethod(rel);
6004 
6005 			Assert(rel->rd_index == NULL);
6006 			Assert(rel->rd_indextuple == NULL);
6007 			Assert(rel->rd_indexcxt == NULL);
6008 			Assert(rel->rd_indam == NULL);
6009 			Assert(rel->rd_opfamily == NULL);
6010 			Assert(rel->rd_opcintype == NULL);
6011 			Assert(rel->rd_support == NULL);
6012 			Assert(rel->rd_supportinfo == NULL);
6013 			Assert(rel->rd_indoption == NULL);
6014 			Assert(rel->rd_indcollation == NULL);
6015 			Assert(rel->rd_opcoptions == NULL);
6016 		}
6017 
6018 		/*
6019 		 * Rules and triggers are not saved (mainly because the internal
6020 		 * format is complex and subject to change).  They must be rebuilt if
6021 		 * needed by RelationCacheInitializePhase3.  This is not expected to
6022 		 * be a big performance hit since few system catalogs have such. Ditto
6023 		 * for RLS policy data, partition info, index expressions, predicates,
6024 		 * exclusion info, and FDW info.
6025 		 */
6026 		rel->rd_rules = NULL;
6027 		rel->rd_rulescxt = NULL;
6028 		rel->trigdesc = NULL;
6029 		rel->rd_rsdesc = NULL;
6030 		rel->rd_partkey = NULL;
6031 		rel->rd_partkeycxt = NULL;
6032 		rel->rd_partdesc = NULL;
6033 		rel->rd_pdcxt = NULL;
6034 		rel->rd_partcheck = NIL;
6035 		rel->rd_partcheckvalid = false;
6036 		rel->rd_partcheckcxt = NULL;
6037 		rel->rd_indexprs = NIL;
6038 		rel->rd_indpred = NIL;
6039 		rel->rd_exclops = NULL;
6040 		rel->rd_exclprocs = NULL;
6041 		rel->rd_exclstrats = NULL;
6042 		rel->rd_fdwroutine = NULL;
6043 
6044 		/*
6045 		 * Reset transient-state fields in the relcache entry
6046 		 */
6047 		rel->rd_smgr = NULL;
6048 		if (rel->rd_isnailed)
6049 			rel->rd_refcnt = 1;
6050 		else
6051 			rel->rd_refcnt = 0;
6052 		rel->rd_indexvalid = false;
6053 		rel->rd_indexlist = NIL;
6054 		rel->rd_pkindex = InvalidOid;
6055 		rel->rd_replidindex = InvalidOid;
6056 		rel->rd_indexattr = NULL;
6057 		rel->rd_keyattr = NULL;
6058 		rel->rd_pkattr = NULL;
6059 		rel->rd_idattr = NULL;
6060 		rel->rd_pubactions = NULL;
6061 		rel->rd_statvalid = false;
6062 		rel->rd_statlist = NIL;
6063 		rel->rd_fkeyvalid = false;
6064 		rel->rd_fkeylist = NIL;
6065 		rel->rd_createSubid = InvalidSubTransactionId;
6066 		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
6067 		rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
6068 		rel->rd_droppedSubid = InvalidSubTransactionId;
6069 		rel->rd_amcache = NULL;
6070 		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
6071 
6072 		/*
6073 		 * Recompute lock and physical addressing info.  This is needed in
6074 		 * case the pg_internal.init file was copied from some other database
6075 		 * by CREATE DATABASE.
6076 		 */
6077 		RelationInitLockInfo(rel);
6078 		RelationInitPhysicalAddr(rel);
6079 	}
6080 
6081 	/*
6082 	 * We reached the end of the init file without apparent problem.  Did we
6083 	 * get the right number of nailed items?  This is a useful crosscheck in
6084 	 * case the set of critical rels or indexes changes.  However, that should
6085 	 * not happen in a normally-running system, so let's bleat if it does.
6086 	 *
6087 	 * For the shared init file, we're called before client authentication is
6088 	 * done, which means that elog(WARNING) will go only to the postmaster
6089 	 * log, where it's easily missed.  To ensure that developers notice bad
6090 	 * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
6091 	 * an Assert(false) there.
6092 	 */
6093 	if (shared)
6094 	{
6095 		if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
6096 			nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
6097 		{
6098 			elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
6099 				 nailed_rels, nailed_indexes,
6100 				 NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
6101 			/* Make sure we get developers' attention about this */
6102 			Assert(false);
6103 			/* In production builds, recover by bootstrapping the relcache */
6104 			goto read_failed;
6105 		}
6106 	}
6107 	else
6108 	{
6109 		if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
6110 			nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
6111 		{
6112 			elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
6113 				 nailed_rels, nailed_indexes,
6114 				 NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
6115 			/* We don't need an Assert() in this case */
6116 			goto read_failed;
6117 		}
6118 	}
6119 
6120 	/*
6121 	 * OK, all appears well.
6122 	 *
6123 	 * Now insert all the new relcache entries into the cache.
6124 	 */
6125 	for (relno = 0; relno < num_rels; relno++)
6126 	{
6127 		RelationCacheInsert(rels[relno], false);
6128 	}
6129 
6130 	pfree(rels);
6131 	FreeFile(fp);
6132 
6133 	if (shared)
6134 		criticalSharedRelcachesBuilt = true;
6135 	else
6136 		criticalRelcachesBuilt = true;
6137 	return true;
6138 
6139 	/*
6140 	 * init file is broken, so do it the hard way.  We don't bother trying to
6141 	 * free the clutter we just allocated; it's not in the relcache so it
6142 	 * won't hurt.
6143 	 */
6144 read_failed:
6145 	pfree(rels);
6146 	FreeFile(fp);
6147 
6148 	return false;
6149 }
6150 
6151 /*
6152  * Write out a new initialization file with the current contents
6153  * of the relcache (either shared rels or local rels, as indicated).
6154  */
6155 static void
write_relcache_init_file(bool shared)6156 write_relcache_init_file(bool shared)
6157 {
6158 	FILE	   *fp;
6159 	char		tempfilename[MAXPGPATH];
6160 	char		finalfilename[MAXPGPATH];
6161 	int			magic;
6162 	HASH_SEQ_STATUS status;
6163 	RelIdCacheEnt *idhentry;
6164 	int			i;
6165 
6166 	/*
6167 	 * If we have already received any relcache inval events, there's no
6168 	 * chance of succeeding so we may as well skip the whole thing.
6169 	 */
6170 	if (relcacheInvalsReceived != 0L)
6171 		return;
6172 
6173 	/*
6174 	 * We must write a temporary file and rename it into place. Otherwise,
6175 	 * another backend starting at about the same time might crash trying to
6176 	 * read the partially-complete file.
6177 	 */
6178 	if (shared)
6179 	{
6180 		snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
6181 				 RELCACHE_INIT_FILENAME, MyProcPid);
6182 		snprintf(finalfilename, sizeof(finalfilename), "global/%s",
6183 				 RELCACHE_INIT_FILENAME);
6184 	}
6185 	else
6186 	{
6187 		snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
6188 				 DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
6189 		snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
6190 				 DatabasePath, RELCACHE_INIT_FILENAME);
6191 	}
6192 
6193 	unlink(tempfilename);		/* in case it exists w/wrong permissions */
6194 
6195 	fp = AllocateFile(tempfilename, PG_BINARY_W);
6196 	if (fp == NULL)
6197 	{
6198 		/*
6199 		 * We used to consider this a fatal error, but we might as well
6200 		 * continue with backend startup ...
6201 		 */
6202 		ereport(WARNING,
6203 				(errcode_for_file_access(),
6204 				 errmsg("could not create relation-cache initialization file \"%s\": %m",
6205 						tempfilename),
6206 				 errdetail("Continuing anyway, but there's something wrong.")));
6207 		return;
6208 	}
6209 
6210 	/*
6211 	 * Write a magic number to serve as a file version identifier.  We can
6212 	 * change the magic number whenever the relcache layout changes.
6213 	 */
6214 	magic = RELCACHE_INIT_FILEMAGIC;
6215 	if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
6216 		elog(FATAL, "could not write init file");
6217 
6218 	/*
6219 	 * Write all the appropriate reldescs (in no particular order).
6220 	 */
6221 	hash_seq_init(&status, RelationIdCache);
6222 
6223 	while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
6224 	{
6225 		Relation	rel = idhentry->reldesc;
6226 		Form_pg_class relform = rel->rd_rel;
6227 
6228 		/* ignore if not correct group */
6229 		if (relform->relisshared != shared)
6230 			continue;
6231 
6232 		/*
6233 		 * Ignore if not supposed to be in init file.  We can allow any shared
6234 		 * relation that's been loaded so far to be in the shared init file,
6235 		 * but unshared relations must be ones that should be in the local
6236 		 * file per RelationIdIsInInitFile.  (Note: if you want to change the
6237 		 * criterion for rels to be kept in the init file, see also inval.c.
6238 		 * The reason for filtering here is to be sure that we don't put
6239 		 * anything into the local init file for which a relcache inval would
6240 		 * not cause invalidation of that init file.)
6241 		 */
6242 		if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
6243 		{
6244 			/* Nailed rels had better get stored. */
6245 			Assert(!rel->rd_isnailed);
6246 			continue;
6247 		}
6248 
6249 		/* first write the relcache entry proper */
6250 		write_item(rel, sizeof(RelationData), fp);
6251 
6252 		/* next write the relation tuple form */
6253 		write_item(relform, CLASS_TUPLE_SIZE, fp);
6254 
6255 		/* next, do all the attribute tuple form data entries */
6256 		for (i = 0; i < relform->relnatts; i++)
6257 		{
6258 			write_item(TupleDescAttr(rel->rd_att, i),
6259 					   ATTRIBUTE_FIXED_PART_SIZE, fp);
6260 		}
6261 
6262 		/* next, do the access method specific field */
6263 		write_item(rel->rd_options,
6264 				   (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
6265 				   fp);
6266 
6267 		/*
6268 		 * If it's an index, there's more to do. Note we explicitly ignore
6269 		 * partitioned indexes here.
6270 		 */
6271 		if (rel->rd_rel->relkind == RELKIND_INDEX)
6272 		{
6273 			/* write the pg_index tuple */
6274 			/* we assume this was created by heap_copytuple! */
6275 			write_item(rel->rd_indextuple,
6276 					   HEAPTUPLESIZE + rel->rd_indextuple->t_len,
6277 					   fp);
6278 
6279 			/* next, write the vector of opfamily OIDs */
6280 			write_item(rel->rd_opfamily,
6281 					   relform->relnatts * sizeof(Oid),
6282 					   fp);
6283 
6284 			/* next, write the vector of opcintype OIDs */
6285 			write_item(rel->rd_opcintype,
6286 					   relform->relnatts * sizeof(Oid),
6287 					   fp);
6288 
6289 			/* next, write the vector of support procedure OIDs */
6290 			write_item(rel->rd_support,
6291 					   relform->relnatts * (rel->rd_indam->amsupport * sizeof(RegProcedure)),
6292 					   fp);
6293 
6294 			/* next, write the vector of collation OIDs */
6295 			write_item(rel->rd_indcollation,
6296 					   relform->relnatts * sizeof(Oid),
6297 					   fp);
6298 
6299 			/* finally, write the vector of indoption values */
6300 			write_item(rel->rd_indoption,
6301 					   relform->relnatts * sizeof(int16),
6302 					   fp);
6303 
6304 			Assert(rel->rd_opcoptions);
6305 
6306 			/* finally, write the vector of opcoptions values */
6307 			for (i = 0; i < relform->relnatts; i++)
6308 			{
6309 				bytea	   *opt = rel->rd_opcoptions[i];
6310 
6311 				write_item(opt, opt ? VARSIZE(opt) : 0, fp);
6312 			}
6313 		}
6314 	}
6315 
6316 	if (FreeFile(fp))
6317 		elog(FATAL, "could not write init file");
6318 
6319 	/*
6320 	 * Now we have to check whether the data we've so painstakingly
6321 	 * accumulated is already obsolete due to someone else's just-committed
6322 	 * catalog changes.  If so, we just delete the temp file and leave it to
6323 	 * the next backend to try again.  (Our own relcache entries will be
6324 	 * updated by SI message processing, but we can't be sure whether what we
6325 	 * wrote out was up-to-date.)
6326 	 *
6327 	 * This mustn't run concurrently with the code that unlinks an init file
6328 	 * and sends SI messages, so grab a serialization lock for the duration.
6329 	 */
6330 	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
6331 
6332 	/* Make sure we have seen all incoming SI messages */
6333 	AcceptInvalidationMessages();
6334 
6335 	/*
6336 	 * If we have received any SI relcache invals since backend start, assume
6337 	 * we may have written out-of-date data.
6338 	 */
6339 	if (relcacheInvalsReceived == 0L)
6340 	{
6341 		/*
6342 		 * OK, rename the temp file to its final name, deleting any
6343 		 * previously-existing init file.
6344 		 *
6345 		 * Note: a failure here is possible under Cygwin, if some other
6346 		 * backend is holding open an unlinked-but-not-yet-gone init file. So
6347 		 * treat this as a noncritical failure; just remove the useless temp
6348 		 * file on failure.
6349 		 */
6350 		if (rename(tempfilename, finalfilename) < 0)
6351 			unlink(tempfilename);
6352 	}
6353 	else
6354 	{
6355 		/* Delete the already-obsolete temp file */
6356 		unlink(tempfilename);
6357 	}
6358 
6359 	LWLockRelease(RelCacheInitLock);
6360 }
6361 
6362 /* write a chunk of data preceded by its length */
6363 static void
write_item(const void * data,Size len,FILE * fp)6364 write_item(const void *data, Size len, FILE *fp)
6365 {
6366 	if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
6367 		elog(FATAL, "could not write init file");
6368 	if (fwrite(data, 1, len, fp) != len)
6369 		elog(FATAL, "could not write init file");
6370 }
6371 
6372 /*
6373  * Determine whether a given relation (identified by OID) is one of the ones
6374  * we should store in a relcache init file.
6375  *
6376  * We must cache all nailed rels, and for efficiency we should cache every rel
6377  * that supports a syscache.  The former set is almost but not quite a subset
6378  * of the latter. The special cases are relations where
6379  * RelationCacheInitializePhase2/3 chooses to nail for efficiency reasons, but
6380  * which do not support any syscache.
6381  */
6382 bool
RelationIdIsInInitFile(Oid relationId)6383 RelationIdIsInInitFile(Oid relationId)
6384 {
6385 	if (relationId == SharedSecLabelRelationId ||
6386 		relationId == TriggerRelidNameIndexId ||
6387 		relationId == DatabaseNameIndexId ||
6388 		relationId == SharedSecLabelObjectIndexId)
6389 	{
6390 		/*
6391 		 * If this Assert fails, we don't need the applicable special case
6392 		 * anymore.
6393 		 */
6394 		Assert(!RelationSupportsSysCache(relationId));
6395 		return true;
6396 	}
6397 	return RelationSupportsSysCache(relationId);
6398 }
6399 
6400 /*
6401  * Invalidate (remove) the init file during commit of a transaction that
6402  * changed one or more of the relation cache entries that are kept in the
6403  * local init file.
6404  *
6405  * To be safe against concurrent inspection or rewriting of the init file,
6406  * we must take RelCacheInitLock, then remove the old init file, then send
6407  * the SI messages that include relcache inval for such relations, and then
6408  * release RelCacheInitLock.  This serializes the whole affair against
6409  * write_relcache_init_file, so that we can be sure that any other process
6410  * that's concurrently trying to create a new init file won't move an
6411  * already-stale version into place after we unlink.  Also, because we unlink
6412  * before sending the SI messages, a backend that's currently starting cannot
6413  * read the now-obsolete init file and then miss the SI messages that will
6414  * force it to update its relcache entries.  (This works because the backend
6415  * startup sequence gets into the sinval array before trying to load the init
6416  * file.)
6417  *
6418  * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
6419  * then release the lock in RelationCacheInitFilePostInvalidate.  Caller must
6420  * send any pending SI messages between those calls.
6421  */
6422 void
RelationCacheInitFilePreInvalidate(void)6423 RelationCacheInitFilePreInvalidate(void)
6424 {
6425 	char		localinitfname[MAXPGPATH];
6426 	char		sharedinitfname[MAXPGPATH];
6427 
6428 	if (DatabasePath)
6429 		snprintf(localinitfname, sizeof(localinitfname), "%s/%s",
6430 				 DatabasePath, RELCACHE_INIT_FILENAME);
6431 	snprintf(sharedinitfname, sizeof(sharedinitfname), "global/%s",
6432 			 RELCACHE_INIT_FILENAME);
6433 
6434 	LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
6435 
6436 	/*
6437 	 * The files might not be there if no backend has been started since the
6438 	 * last removal.  But complain about failures other than ENOENT with
6439 	 * ERROR.  Fortunately, it's not too late to abort the transaction if we
6440 	 * can't get rid of the would-be-obsolete init file.
6441 	 */
6442 	if (DatabasePath)
6443 		unlink_initfile(localinitfname, ERROR);
6444 	unlink_initfile(sharedinitfname, ERROR);
6445 }
6446 
6447 void
RelationCacheInitFilePostInvalidate(void)6448 RelationCacheInitFilePostInvalidate(void)
6449 {
6450 	LWLockRelease(RelCacheInitLock);
6451 }
6452 
6453 /*
6454  * Remove the init files during postmaster startup.
6455  *
6456  * We used to keep the init files across restarts, but that is unsafe in PITR
6457  * scenarios, and even in simple crash-recovery cases there are windows for
6458  * the init files to become out-of-sync with the database.  So now we just
6459  * remove them during startup and expect the first backend launch to rebuild
6460  * them.  Of course, this has to happen in each database of the cluster.
6461  */
6462 void
RelationCacheInitFileRemove(void)6463 RelationCacheInitFileRemove(void)
6464 {
6465 	const char *tblspcdir = "pg_tblspc";
6466 	DIR		   *dir;
6467 	struct dirent *de;
6468 	char		path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY)];
6469 
6470 	snprintf(path, sizeof(path), "global/%s",
6471 			 RELCACHE_INIT_FILENAME);
6472 	unlink_initfile(path, LOG);
6473 
6474 	/* Scan everything in the default tablespace */
6475 	RelationCacheInitFileRemoveInDir("base");
6476 
6477 	/* Scan the tablespace link directory to find non-default tablespaces */
6478 	dir = AllocateDir(tblspcdir);
6479 
6480 	while ((de = ReadDirExtended(dir, tblspcdir, LOG)) != NULL)
6481 	{
6482 		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
6483 		{
6484 			/* Scan the tablespace dir for per-database dirs */
6485 			snprintf(path, sizeof(path), "%s/%s/%s",
6486 					 tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
6487 			RelationCacheInitFileRemoveInDir(path);
6488 		}
6489 	}
6490 
6491 	FreeDir(dir);
6492 }
6493 
6494 /* Process one per-tablespace directory for RelationCacheInitFileRemove */
6495 static void
RelationCacheInitFileRemoveInDir(const char * tblspcpath)6496 RelationCacheInitFileRemoveInDir(const char *tblspcpath)
6497 {
6498 	DIR		   *dir;
6499 	struct dirent *de;
6500 	char		initfilename[MAXPGPATH * 2];
6501 
6502 	/* Scan the tablespace directory to find per-database directories */
6503 	dir = AllocateDir(tblspcpath);
6504 
6505 	while ((de = ReadDirExtended(dir, tblspcpath, LOG)) != NULL)
6506 	{
6507 		if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
6508 		{
6509 			/* Try to remove the init file in each database */
6510 			snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
6511 					 tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
6512 			unlink_initfile(initfilename, LOG);
6513 		}
6514 	}
6515 
6516 	FreeDir(dir);
6517 }
6518 
6519 static void
unlink_initfile(const char * initfilename,int elevel)6520 unlink_initfile(const char *initfilename, int elevel)
6521 {
6522 	if (unlink(initfilename) < 0)
6523 	{
6524 		/* It might not be there, but log any error other than ENOENT */
6525 		if (errno != ENOENT)
6526 			ereport(elevel,
6527 					(errcode_for_file_access(),
6528 					 errmsg("could not remove cache file \"%s\": %m",
6529 							initfilename)));
6530 	}
6531 }
6532