1 /*-------------------------------------------------------------------------
2  *
3  * heap.c
4  *	  code to create and destroy POSTGRES heap relations
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/catalog/heap.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *		heap_create()			- Create an uncataloged heap relation
16  *		heap_create_with_catalog() - Create a cataloged relation
17  *		heap_drop_with_catalog() - Removes named relation from catalogs
18  *
19  * NOTES
20  *	  this code taken from access/heap/create.c, which contains
21  *	  the old heap_create_with_catalog, amcreate, and amdestroy.
22  *	  those routines will soon call these routines using the function
23  *	  manager,
24  *	  just like the poorly named "NewXXX" routines do.  The
25  *	  "New" routines are all going to die soon, once and for all!
26  *		-cim 1/13/91
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31 
32 #include "access/htup_details.h"
33 #include "access/multixact.h"
34 #include "access/sysattr.h"
35 #include "access/transam.h"
36 #include "access/xact.h"
37 #include "access/xlog.h"
38 #include "catalog/binary_upgrade.h"
39 #include "catalog/catalog.h"
40 #include "catalog/dependency.h"
41 #include "catalog/heap.h"
42 #include "catalog/index.h"
43 #include "catalog/objectaccess.h"
44 #include "catalog/partition.h"
45 #include "catalog/pg_attrdef.h"
46 #include "catalog/pg_collation.h"
47 #include "catalog/pg_constraint.h"
48 #include "catalog/pg_constraint_fn.h"
49 #include "catalog/pg_foreign_table.h"
50 #include "catalog/pg_inherits.h"
51 #include "catalog/pg_namespace.h"
52 #include "catalog/pg_opclass.h"
53 #include "catalog/pg_partitioned_table.h"
54 #include "catalog/pg_statistic.h"
55 #include "catalog/pg_subscription_rel.h"
56 #include "catalog/pg_tablespace.h"
57 #include "catalog/pg_type.h"
58 #include "catalog/pg_type_fn.h"
59 #include "catalog/storage.h"
60 #include "catalog/storage_xlog.h"
61 #include "commands/tablecmds.h"
62 #include "commands/typecmds.h"
63 #include "miscadmin.h"
64 #include "nodes/nodeFuncs.h"
65 #include "optimizer/var.h"
66 #include "parser/parse_coerce.h"
67 #include "parser/parse_collate.h"
68 #include "parser/parse_expr.h"
69 #include "parser/parse_relation.h"
70 #include "storage/lmgr.h"
71 #include "storage/predicate.h"
72 #include "storage/smgr.h"
73 #include "utils/acl.h"
74 #include "utils/builtins.h"
75 #include "utils/fmgroids.h"
76 #include "utils/inval.h"
77 #include "utils/lsyscache.h"
78 #include "utils/rel.h"
79 #include "utils/ruleutils.h"
80 #include "utils/snapmgr.h"
81 #include "utils/syscache.h"
82 #include "utils/tqual.h"
83 
84 
85 /* Potentially set by pg_upgrade_support functions */
86 Oid			binary_upgrade_next_heap_pg_class_oid = InvalidOid;
87 Oid			binary_upgrade_next_toast_pg_class_oid = InvalidOid;
88 
89 static void AddNewRelationTuple(Relation pg_class_desc,
90 					Relation new_rel_desc,
91 					Oid new_rel_oid,
92 					Oid new_type_oid,
93 					Oid reloftype,
94 					Oid relowner,
95 					char relkind,
96 					Datum relacl,
97 					Datum reloptions);
98 static ObjectAddress AddNewRelationType(const char *typeName,
99 				   Oid typeNamespace,
100 				   Oid new_rel_oid,
101 				   char new_rel_kind,
102 				   Oid ownerid,
103 				   Oid new_row_type,
104 				   Oid new_array_type);
105 static void RelationRemoveInheritance(Oid relid);
106 static Oid StoreRelCheck(Relation rel, char *ccname, Node *expr,
107 			  bool is_validated, bool is_local, int inhcount,
108 			  bool is_no_inherit, bool is_internal);
109 static void StoreConstraints(Relation rel, List *cooked_constraints,
110 				 bool is_internal);
111 static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
112 							bool allow_merge, bool is_local,
113 							bool is_initially_valid,
114 							bool is_no_inherit);
115 static void SetRelationNumChecks(Relation rel, int numchecks);
116 static Node *cookConstraint(ParseState *pstate,
117 			   Node *raw_constraint,
118 			   char *relname);
119 static List *insert_ordered_unique_oid(List *list, Oid datum);
120 
121 
122 /* ----------------------------------------------------------------
123  *				XXX UGLY HARD CODED BADNESS FOLLOWS XXX
124  *
125  *		these should all be moved to someplace in the lib/catalog
126  *		module, if not obliterated first.
127  * ----------------------------------------------------------------
128  */
129 
130 
131 /*
132  * Note:
133  *		Should the system special case these attributes in the future?
134  *		Advantage:	consume much less space in the ATTRIBUTE relation.
135  *		Disadvantage:  special cases will be all over the place.
136  */
137 
138 /*
139  * The initializers below do not include trailing variable length fields,
140  * but that's OK - we're never going to reference anything beyond the
141  * fixed-size portion of the structure anyway.
142  */
143 
144 static FormData_pg_attribute a1 = {
145 	0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
146 	SelfItemPointerAttributeNumber, 0, -1, -1,
147 	false, 'p', 's', true, false, '\0', false, true, 0
148 };
149 
150 static FormData_pg_attribute a2 = {
151 	0, {"oid"}, OIDOID, 0, sizeof(Oid),
152 	ObjectIdAttributeNumber, 0, -1, -1,
153 	true, 'p', 'i', true, false, '\0', false, true, 0
154 };
155 
156 static FormData_pg_attribute a3 = {
157 	0, {"xmin"}, XIDOID, 0, sizeof(TransactionId),
158 	MinTransactionIdAttributeNumber, 0, -1, -1,
159 	true, 'p', 'i', true, false, '\0', false, true, 0
160 };
161 
162 static FormData_pg_attribute a4 = {
163 	0, {"cmin"}, CIDOID, 0, sizeof(CommandId),
164 	MinCommandIdAttributeNumber, 0, -1, -1,
165 	true, 'p', 'i', true, false, '\0', false, true, 0
166 };
167 
168 static FormData_pg_attribute a5 = {
169 	0, {"xmax"}, XIDOID, 0, sizeof(TransactionId),
170 	MaxTransactionIdAttributeNumber, 0, -1, -1,
171 	true, 'p', 'i', true, false, '\0', false, true, 0
172 };
173 
174 static FormData_pg_attribute a6 = {
175 	0, {"cmax"}, CIDOID, 0, sizeof(CommandId),
176 	MaxCommandIdAttributeNumber, 0, -1, -1,
177 	true, 'p', 'i', true, false, '\0', false, true, 0
178 };
179 
180 /*
181  * We decided to call this attribute "tableoid" rather than say
182  * "classoid" on the basis that in the future there may be more than one
183  * table of a particular class/type. In any case table is still the word
184  * used in SQL.
185  */
186 static FormData_pg_attribute a7 = {
187 	0, {"tableoid"}, OIDOID, 0, sizeof(Oid),
188 	TableOidAttributeNumber, 0, -1, -1,
189 	true, 'p', 'i', true, false, '\0', false, true, 0
190 };
191 
192 static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7};
193 
194 /*
195  * This function returns a Form_pg_attribute pointer for a system attribute.
196  * Note that we elog if the presented attno is invalid, which would only
197  * happen if there's a problem upstream.
198  */
199 Form_pg_attribute
SystemAttributeDefinition(AttrNumber attno,bool relhasoids)200 SystemAttributeDefinition(AttrNumber attno, bool relhasoids)
201 {
202 	if (attno >= 0 || attno < -(int) lengthof(SysAtt))
203 		elog(ERROR, "invalid system attribute number %d", attno);
204 	if (attno == ObjectIdAttributeNumber && !relhasoids)
205 		elog(ERROR, "invalid system attribute number %d", attno);
206 	return SysAtt[-attno - 1];
207 }
208 
209 /*
210  * If the given name is a system attribute name, return a Form_pg_attribute
211  * pointer for a prototype definition.  If not, return NULL.
212  */
213 Form_pg_attribute
SystemAttributeByName(const char * attname,bool relhasoids)214 SystemAttributeByName(const char *attname, bool relhasoids)
215 {
216 	int			j;
217 
218 	for (j = 0; j < (int) lengthof(SysAtt); j++)
219 	{
220 		Form_pg_attribute att = SysAtt[j];
221 
222 		if (relhasoids || att->attnum != ObjectIdAttributeNumber)
223 		{
224 			if (strcmp(NameStr(att->attname), attname) == 0)
225 				return att;
226 		}
227 	}
228 
229 	return NULL;
230 }
231 
232 
233 /* ----------------------------------------------------------------
234  *				XXX END OF UGLY HARD CODED BADNESS XXX
235  * ---------------------------------------------------------------- */
236 
237 
238 /* ----------------------------------------------------------------
239  *		heap_create		- Create an uncataloged heap relation
240  *
241  *		Note API change: the caller must now always provide the OID
242  *		to use for the relation.  The relfilenode may (and, normally,
243  *		should) be left unspecified.
244  *
245  *		rel->rd_rel is initialized by RelationBuildLocalRelation,
246  *		and is mostly zeroes at return.
247  * ----------------------------------------------------------------
248  */
249 Relation
heap_create(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid relfilenode,TupleDesc tupDesc,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,bool allow_system_table_mods)250 heap_create(const char *relname,
251 			Oid relnamespace,
252 			Oid reltablespace,
253 			Oid relid,
254 			Oid relfilenode,
255 			TupleDesc tupDesc,
256 			char relkind,
257 			char relpersistence,
258 			bool shared_relation,
259 			bool mapped_relation,
260 			bool allow_system_table_mods)
261 {
262 	bool		create_storage;
263 	Relation	rel;
264 
265 	/* The caller must have provided an OID for the relation. */
266 	Assert(OidIsValid(relid));
267 
268 	/*
269 	 * Don't allow creating relations in pg_catalog directly, even though it
270 	 * is allowed to move user defined relations there. Semantics with search
271 	 * paths including pg_catalog are too confusing for now.
272 	 *
273 	 * But allow creating indexes on relations in pg_catalog even if
274 	 * allow_system_table_mods = off, upper layers already guarantee it's on a
275 	 * user defined relation, not a system one.
276 	 */
277 	if (!allow_system_table_mods &&
278 		((IsSystemNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
279 		 IsToastNamespace(relnamespace)) &&
280 		IsNormalProcessingMode())
281 		ereport(ERROR,
282 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
283 				 errmsg("permission denied to create \"%s.%s\"",
284 						get_namespace_name(relnamespace), relname),
285 				 errdetail("System catalog modifications are currently disallowed.")));
286 
287 	/*
288 	 * Decide if we need storage or not, and handle a couple other special
289 	 * cases for particular relkinds.
290 	 */
291 	switch (relkind)
292 	{
293 		case RELKIND_VIEW:
294 		case RELKIND_COMPOSITE_TYPE:
295 		case RELKIND_FOREIGN_TABLE:
296 		case RELKIND_PARTITIONED_TABLE:
297 			create_storage = false;
298 
299 			/*
300 			 * Force reltablespace to zero if the relation has no physical
301 			 * storage.  This is mainly just for cleanliness' sake.
302 			 */
303 			reltablespace = InvalidOid;
304 			break;
305 		case RELKIND_SEQUENCE:
306 			create_storage = true;
307 
308 			/*
309 			 * Force reltablespace to zero for sequences, since we don't
310 			 * support moving them around into different tablespaces.
311 			 */
312 			reltablespace = InvalidOid;
313 			break;
314 		default:
315 			create_storage = true;
316 			break;
317 	}
318 
319 	/*
320 	 * Unless otherwise requested, the physical ID (relfilenode) is initially
321 	 * the same as the logical ID (OID).  When the caller did specify a
322 	 * relfilenode, it already exists; do not attempt to create it.
323 	 */
324 	if (OidIsValid(relfilenode))
325 		create_storage = false;
326 	else
327 		relfilenode = relid;
328 
329 	/*
330 	 * Never allow a pg_class entry to explicitly specify the database's
331 	 * default tablespace in reltablespace; force it to zero instead. This
332 	 * ensures that if the database is cloned with a different default
333 	 * tablespace, the pg_class entry will still match where CREATE DATABASE
334 	 * will put the physically copied relation.
335 	 *
336 	 * Yes, this is a bit of a hack.
337 	 */
338 	if (reltablespace == MyDatabaseTableSpace)
339 		reltablespace = InvalidOid;
340 
341 	/*
342 	 * build the relcache entry.
343 	 */
344 	rel = RelationBuildLocalRelation(relname,
345 									 relnamespace,
346 									 tupDesc,
347 									 relid,
348 									 relfilenode,
349 									 reltablespace,
350 									 shared_relation,
351 									 mapped_relation,
352 									 relpersistence,
353 									 relkind);
354 
355 	/*
356 	 * Have the storage manager create the relation's disk file, if needed.
357 	 *
358 	 * We only create the main fork here, other forks will be created on
359 	 * demand.
360 	 */
361 	if (create_storage)
362 	{
363 		RelationOpenSmgr(rel);
364 		RelationCreateStorage(rel->rd_node, relpersistence);
365 	}
366 
367 	return rel;
368 }
369 
370 /* ----------------------------------------------------------------
371  *		heap_create_with_catalog		- Create a cataloged relation
372  *
373  *		this is done in multiple steps:
374  *
375  *		1) CheckAttributeNamesTypes() is used to make certain the tuple
376  *		   descriptor contains a valid set of attribute names and types
377  *
378  *		2) pg_class is opened and get_relname_relid()
379  *		   performs a scan to ensure that no relation with the
380  *		   same name already exists.
381  *
382  *		3) heap_create() is called to create the new relation on disk.
383  *
384  *		4) TypeCreate() is called to define a new type corresponding
385  *		   to the new relation.
386  *
387  *		5) AddNewRelationTuple() is called to register the
388  *		   relation in pg_class.
389  *
390  *		6) AddNewAttributeTuples() is called to register the
391  *		   new relation's schema in pg_attribute.
392  *
393  *		7) StoreConstraints is called ()		- vadim 08/22/97
394  *
395  *		8) the relations are closed and the new relation's oid
396  *		   is returned.
397  *
398  * ----------------------------------------------------------------
399  */
400 
401 /* --------------------------------
402  *		CheckAttributeNamesTypes
403  *
404  *		this is used to make certain the tuple descriptor contains a
405  *		valid set of attribute names and datatypes.  a problem simply
406  *		generates ereport(ERROR) which aborts the current transaction.
407  * --------------------------------
408  */
409 void
CheckAttributeNamesTypes(TupleDesc tupdesc,char relkind,bool allow_system_table_mods)410 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
411 						 bool allow_system_table_mods)
412 {
413 	int			i;
414 	int			j;
415 	int			natts = tupdesc->natts;
416 
417 	/* Sanity check on column count */
418 	if (natts < 0 || natts > MaxHeapAttributeNumber)
419 		ereport(ERROR,
420 				(errcode(ERRCODE_TOO_MANY_COLUMNS),
421 				 errmsg("tables can have at most %d columns",
422 						MaxHeapAttributeNumber)));
423 
424 	/*
425 	 * first check for collision with system attribute names
426 	 *
427 	 * Skip this for a view or type relation, since those don't have system
428 	 * attributes.
429 	 */
430 	if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
431 	{
432 		for (i = 0; i < natts; i++)
433 		{
434 			if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
435 									  tupdesc->tdhasoid) != NULL)
436 				ereport(ERROR,
437 						(errcode(ERRCODE_DUPLICATE_COLUMN),
438 						 errmsg("column name \"%s\" conflicts with a system column name",
439 								NameStr(tupdesc->attrs[i]->attname))));
440 		}
441 	}
442 
443 	/*
444 	 * next check for repeated attribute names
445 	 */
446 	for (i = 1; i < natts; i++)
447 	{
448 		for (j = 0; j < i; j++)
449 		{
450 			if (strcmp(NameStr(tupdesc->attrs[j]->attname),
451 					   NameStr(tupdesc->attrs[i]->attname)) == 0)
452 				ereport(ERROR,
453 						(errcode(ERRCODE_DUPLICATE_COLUMN),
454 						 errmsg("column name \"%s\" specified more than once",
455 								NameStr(tupdesc->attrs[j]->attname))));
456 		}
457 	}
458 
459 	/*
460 	 * next check the attribute types
461 	 */
462 	for (i = 0; i < natts; i++)
463 	{
464 		CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
465 						   tupdesc->attrs[i]->atttypid,
466 						   tupdesc->attrs[i]->attcollation,
467 						   NIL, /* assume we're creating a new rowtype */
468 						   allow_system_table_mods);
469 	}
470 }
471 
472 /* --------------------------------
473  *		CheckAttributeType
474  *
475  *		Verify that the proposed datatype of an attribute is legal.
476  *		This is needed mainly because there are types (and pseudo-types)
477  *		in the catalogs that we do not support as elements of real tuples.
478  *		We also check some other properties required of a table column.
479  *
480  * If the attribute is being proposed for addition to an existing table or
481  * composite type, pass a one-element list of the rowtype OID as
482  * containing_rowtypes.  When checking a to-be-created rowtype, it's
483  * sufficient to pass NIL, because there could not be any recursive reference
484  * to a not-yet-existing rowtype.
485  * --------------------------------
486  */
487 void
CheckAttributeType(const char * attname,Oid atttypid,Oid attcollation,List * containing_rowtypes,bool allow_system_table_mods)488 CheckAttributeType(const char *attname,
489 				   Oid atttypid, Oid attcollation,
490 				   List *containing_rowtypes,
491 				   bool allow_system_table_mods)
492 {
493 	char		att_typtype = get_typtype(atttypid);
494 	Oid			att_typelem;
495 
496 	if (att_typtype == TYPTYPE_PSEUDO)
497 	{
498 		/*
499 		 * Refuse any attempt to create a pseudo-type column, except for a
500 		 * special hack for pg_statistic: allow ANYARRAY when modifying system
501 		 * catalogs (this allows creating pg_statistic and cloning it during
502 		 * VACUUM FULL)
503 		 */
504 		if (atttypid != ANYARRAYOID || !allow_system_table_mods)
505 			ereport(ERROR,
506 					(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
507 					 errmsg("column \"%s\" has pseudo-type %s",
508 							attname, format_type_be(atttypid))));
509 	}
510 	else if (att_typtype == TYPTYPE_DOMAIN)
511 	{
512 		/*
513 		 * If it's a domain, recurse to check its base type.
514 		 */
515 		CheckAttributeType(attname, getBaseType(atttypid), attcollation,
516 						   containing_rowtypes,
517 						   allow_system_table_mods);
518 	}
519 	else if (att_typtype == TYPTYPE_COMPOSITE)
520 	{
521 		/*
522 		 * For a composite type, recurse into its attributes.
523 		 */
524 		Relation	relation;
525 		TupleDesc	tupdesc;
526 		int			i;
527 
528 		/*
529 		 * Check for self-containment.  Eventually we might be able to allow
530 		 * this (just return without complaint, if so) but it's not clear how
531 		 * many other places would require anti-recursion defenses before it
532 		 * would be safe to allow tables to contain their own rowtype.
533 		 */
534 		if (list_member_oid(containing_rowtypes, atttypid))
535 			ereport(ERROR,
536 					(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
537 					 errmsg("composite type %s cannot be made a member of itself",
538 							format_type_be(atttypid))));
539 
540 		containing_rowtypes = lcons_oid(atttypid, containing_rowtypes);
541 
542 		relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
543 
544 		tupdesc = RelationGetDescr(relation);
545 
546 		for (i = 0; i < tupdesc->natts; i++)
547 		{
548 			Form_pg_attribute attr = tupdesc->attrs[i];
549 
550 			if (attr->attisdropped)
551 				continue;
552 			CheckAttributeType(NameStr(attr->attname),
553 							   attr->atttypid, attr->attcollation,
554 							   containing_rowtypes,
555 							   allow_system_table_mods);
556 		}
557 
558 		relation_close(relation, AccessShareLock);
559 
560 		containing_rowtypes = list_delete_first(containing_rowtypes);
561 	}
562 	else if (att_typtype == TYPTYPE_RANGE)
563 	{
564 		/*
565 		 * If it's a range, recurse to check its subtype.
566 		 */
567 		CheckAttributeType(attname, get_range_subtype(atttypid),
568 						   get_range_collation(atttypid),
569 						   containing_rowtypes,
570 						   allow_system_table_mods);
571 	}
572 	else if (OidIsValid((att_typelem = get_element_type(atttypid))))
573 	{
574 		/*
575 		 * Must recurse into array types, too, in case they are composite.
576 		 */
577 		CheckAttributeType(attname, att_typelem, attcollation,
578 						   containing_rowtypes,
579 						   allow_system_table_mods);
580 	}
581 
582 	/*
583 	 * This might not be strictly invalid per SQL standard, but it is pretty
584 	 * useless, and it cannot be dumped, so we must disallow it.
585 	 */
586 	if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
587 		ereport(ERROR,
588 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
589 				 errmsg("no collation was derived for column \"%s\" with collatable type %s",
590 						attname, format_type_be(atttypid)),
591 				 errhint("Use the COLLATE clause to set the collation explicitly.")));
592 }
593 
594 /*
595  * InsertPgAttributeTuple
596  *		Construct and insert a new tuple in pg_attribute.
597  *
598  * Caller has already opened and locked pg_attribute.  new_attribute is the
599  * attribute to insert (but we ignore attacl and attoptions, which are always
600  * initialized to NULL).
601  *
602  * indstate is the index state for CatalogTupleInsertWithInfo.  It can be
603  * passed as NULL, in which case we'll fetch the necessary info.  (Don't do
604  * this when inserting multiple attributes, because it's a tad more
605  * expensive.)
606  */
607 void
InsertPgAttributeTuple(Relation pg_attribute_rel,Form_pg_attribute new_attribute,CatalogIndexState indstate)608 InsertPgAttributeTuple(Relation pg_attribute_rel,
609 					   Form_pg_attribute new_attribute,
610 					   CatalogIndexState indstate)
611 {
612 	Datum		values[Natts_pg_attribute];
613 	bool		nulls[Natts_pg_attribute];
614 	HeapTuple	tup;
615 
616 	/* This is a tad tedious, but way cleaner than what we used to do... */
617 	memset(values, 0, sizeof(values));
618 	memset(nulls, false, sizeof(nulls));
619 
620 	values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
621 	values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
622 	values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
623 	values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
624 	values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
625 	values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
626 	values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
627 	values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
628 	values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
629 	values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
630 	values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
631 	values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
632 	values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
633 	values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
634 	values[Anum_pg_attribute_attidentity - 1] = CharGetDatum(new_attribute->attidentity);
635 	values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
636 	values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
637 	values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
638 	values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
639 
640 	/* start out with empty permissions and empty options */
641 	nulls[Anum_pg_attribute_attacl - 1] = true;
642 	nulls[Anum_pg_attribute_attoptions - 1] = true;
643 	nulls[Anum_pg_attribute_attfdwoptions - 1] = true;
644 
645 	tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
646 
647 	/* finally insert the new tuple, update the indexes, and clean up */
648 	if (indstate != NULL)
649 		CatalogTupleInsertWithInfo(pg_attribute_rel, tup, indstate);
650 	else
651 		CatalogTupleInsert(pg_attribute_rel, tup);
652 
653 	heap_freetuple(tup);
654 }
655 
656 /* --------------------------------
657  *		AddNewAttributeTuples
658  *
659  *		this registers the new relation's schema by adding
660  *		tuples to pg_attribute.
661  * --------------------------------
662  */
663 static void
AddNewAttributeTuples(Oid new_rel_oid,TupleDesc tupdesc,char relkind,bool oidislocal,int oidinhcount)664 AddNewAttributeTuples(Oid new_rel_oid,
665 					  TupleDesc tupdesc,
666 					  char relkind,
667 					  bool oidislocal,
668 					  int oidinhcount)
669 {
670 	Form_pg_attribute attr;
671 	int			i;
672 	Relation	rel;
673 	CatalogIndexState indstate;
674 	int			natts = tupdesc->natts;
675 	ObjectAddress myself,
676 				referenced;
677 
678 	/*
679 	 * open pg_attribute and its indexes.
680 	 */
681 	rel = heap_open(AttributeRelationId, RowExclusiveLock);
682 
683 	indstate = CatalogOpenIndexes(rel);
684 
685 	/*
686 	 * First we add the user attributes.  This is also a convenient place to
687 	 * add dependencies on their datatypes and collations.
688 	 */
689 	for (i = 0; i < natts; i++)
690 	{
691 		attr = tupdesc->attrs[i];
692 		/* Fill in the correct relation OID */
693 		attr->attrelid = new_rel_oid;
694 		/* Make sure these are OK, too */
695 		attr->attstattarget = -1;
696 		attr->attcacheoff = -1;
697 
698 		InsertPgAttributeTuple(rel, attr, indstate);
699 
700 		/* Add dependency info */
701 		myself.classId = RelationRelationId;
702 		myself.objectId = new_rel_oid;
703 		myself.objectSubId = i + 1;
704 		referenced.classId = TypeRelationId;
705 		referenced.objectId = attr->atttypid;
706 		referenced.objectSubId = 0;
707 		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
708 
709 		/* The default collation is pinned, so don't bother recording it */
710 		if (OidIsValid(attr->attcollation) &&
711 			attr->attcollation != DEFAULT_COLLATION_OID)
712 		{
713 			referenced.classId = CollationRelationId;
714 			referenced.objectId = attr->attcollation;
715 			referenced.objectSubId = 0;
716 			recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
717 		}
718 	}
719 
720 	/*
721 	 * Next we add the system attributes.  Skip OID if rel has no OIDs. Skip
722 	 * all for a view or type relation.  We don't bother with making datatype
723 	 * dependencies here, since presumably all these types are pinned.
724 	 */
725 	if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
726 	{
727 		for (i = 0; i < (int) lengthof(SysAtt); i++)
728 		{
729 			FormData_pg_attribute attStruct;
730 
731 			/* skip OID where appropriate */
732 			if (!tupdesc->tdhasoid &&
733 				SysAtt[i]->attnum == ObjectIdAttributeNumber)
734 				continue;
735 
736 			memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
737 
738 			/* Fill in the correct relation OID in the copied tuple */
739 			attStruct.attrelid = new_rel_oid;
740 
741 			/* Fill in correct inheritance info for the OID column */
742 			if (attStruct.attnum == ObjectIdAttributeNumber)
743 			{
744 				attStruct.attislocal = oidislocal;
745 				attStruct.attinhcount = oidinhcount;
746 			}
747 
748 			InsertPgAttributeTuple(rel, &attStruct, indstate);
749 		}
750 	}
751 
752 	/*
753 	 * clean up
754 	 */
755 	CatalogCloseIndexes(indstate);
756 
757 	heap_close(rel, RowExclusiveLock);
758 }
759 
760 /* --------------------------------
761  *		InsertPgClassTuple
762  *
763  *		Construct and insert a new tuple in pg_class.
764  *
765  * Caller has already opened and locked pg_class.
766  * Tuple data is taken from new_rel_desc->rd_rel, except for the
767  * variable-width fields which are not present in a cached reldesc.
768  * relacl and reloptions are passed in Datum form (to avoid having
769  * to reference the data types in heap.h).  Pass (Datum) 0 to set them
770  * to NULL.
771  * --------------------------------
772  */
773 void
InsertPgClassTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Datum relacl,Datum reloptions)774 InsertPgClassTuple(Relation pg_class_desc,
775 				   Relation new_rel_desc,
776 				   Oid new_rel_oid,
777 				   Datum relacl,
778 				   Datum reloptions)
779 {
780 	Form_pg_class rd_rel = new_rel_desc->rd_rel;
781 	Datum		values[Natts_pg_class];
782 	bool		nulls[Natts_pg_class];
783 	HeapTuple	tup;
784 
785 	/* This is a tad tedious, but way cleaner than what we used to do... */
786 	memset(values, 0, sizeof(values));
787 	memset(nulls, false, sizeof(nulls));
788 
789 	values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
790 	values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
791 	values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
792 	values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
793 	values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
794 	values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
795 	values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
796 	values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
797 	values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
798 	values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
799 	values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
800 	values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
801 	values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
802 	values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
803 	values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
804 	values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
805 	values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
806 	values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
807 	values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
808 	values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
809 	values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
810 	values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
811 	values[Anum_pg_class_relrowsecurity - 1] = BoolGetDatum(rd_rel->relrowsecurity);
812 	values[Anum_pg_class_relforcerowsecurity - 1] = BoolGetDatum(rd_rel->relforcerowsecurity);
813 	values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
814 	values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
815 	values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
816 	values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
817 	values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
818 	values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
819 	if (relacl != (Datum) 0)
820 		values[Anum_pg_class_relacl - 1] = relacl;
821 	else
822 		nulls[Anum_pg_class_relacl - 1] = true;
823 	if (reloptions != (Datum) 0)
824 		values[Anum_pg_class_reloptions - 1] = reloptions;
825 	else
826 		nulls[Anum_pg_class_reloptions - 1] = true;
827 
828 	/* relpartbound is set by updating this tuple, if necessary */
829 	nulls[Anum_pg_class_relpartbound - 1] = true;
830 
831 	tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
832 
833 	/*
834 	 * The new tuple must have the oid already chosen for the rel.  Sure would
835 	 * be embarrassing to do this sort of thing in polite company.
836 	 */
837 	HeapTupleSetOid(tup, new_rel_oid);
838 
839 	/* finally insert the new tuple, update the indexes, and clean up */
840 	CatalogTupleInsert(pg_class_desc, tup);
841 
842 	heap_freetuple(tup);
843 }
844 
845 /* --------------------------------
846  *		AddNewRelationTuple
847  *
848  *		this registers the new relation in the catalogs by
849  *		adding a tuple to pg_class.
850  * --------------------------------
851  */
852 static void
AddNewRelationTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Oid new_type_oid,Oid reloftype,Oid relowner,char relkind,Datum relacl,Datum reloptions)853 AddNewRelationTuple(Relation pg_class_desc,
854 					Relation new_rel_desc,
855 					Oid new_rel_oid,
856 					Oid new_type_oid,
857 					Oid reloftype,
858 					Oid relowner,
859 					char relkind,
860 					Datum relacl,
861 					Datum reloptions)
862 {
863 	Form_pg_class new_rel_reltup;
864 
865 	/*
866 	 * first we update some of the information in our uncataloged relation's
867 	 * relation descriptor.
868 	 */
869 	new_rel_reltup = new_rel_desc->rd_rel;
870 
871 	switch (relkind)
872 	{
873 		case RELKIND_RELATION:
874 		case RELKIND_MATVIEW:
875 		case RELKIND_INDEX:
876 		case RELKIND_TOASTVALUE:
877 			/* The relation is real, but as yet empty */
878 			new_rel_reltup->relpages = 0;
879 			new_rel_reltup->reltuples = 0;
880 			new_rel_reltup->relallvisible = 0;
881 			break;
882 		case RELKIND_SEQUENCE:
883 			/* Sequences always have a known size */
884 			new_rel_reltup->relpages = 1;
885 			new_rel_reltup->reltuples = 1;
886 			new_rel_reltup->relallvisible = 0;
887 			break;
888 		default:
889 			/* Views, etc, have no disk storage */
890 			new_rel_reltup->relpages = 0;
891 			new_rel_reltup->reltuples = 0;
892 			new_rel_reltup->relallvisible = 0;
893 			break;
894 	}
895 
896 	/* Initialize relfrozenxid and relminmxid */
897 	if (relkind == RELKIND_RELATION ||
898 		relkind == RELKIND_MATVIEW ||
899 		relkind == RELKIND_TOASTVALUE)
900 	{
901 		/*
902 		 * Initialize to the minimum XID that could put tuples in the table.
903 		 * We know that no xacts older than RecentXmin are still running, so
904 		 * that will do.
905 		 */
906 		new_rel_reltup->relfrozenxid = RecentXmin;
907 
908 		/*
909 		 * Similarly, initialize the minimum Multixact to the first value that
910 		 * could possibly be stored in tuples in the table.  Running
911 		 * transactions could reuse values from their local cache, so we are
912 		 * careful to consider all currently running multis.
913 		 *
914 		 * XXX this could be refined further, but is it worth the hassle?
915 		 */
916 		new_rel_reltup->relminmxid = GetOldestMultiXactId();
917 	}
918 	else
919 	{
920 		/*
921 		 * Other relation types will not contain XIDs, so set relfrozenxid to
922 		 * InvalidTransactionId.  (Note: a sequence does contain a tuple, but
923 		 * we force its xmin to be FrozenTransactionId always; see
924 		 * commands/sequence.c.)
925 		 */
926 		new_rel_reltup->relfrozenxid = InvalidTransactionId;
927 		new_rel_reltup->relminmxid = InvalidMultiXactId;
928 	}
929 
930 	new_rel_reltup->relowner = relowner;
931 	new_rel_reltup->reltype = new_type_oid;
932 	new_rel_reltup->reloftype = reloftype;
933 
934 	/* relispartition is always set by updating this tuple later */
935 	new_rel_reltup->relispartition = false;
936 
937 	new_rel_desc->rd_att->tdtypeid = new_type_oid;
938 
939 	/* Now build and insert the tuple */
940 	InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
941 					   relacl, reloptions);
942 }
943 
944 
945 /* --------------------------------
946  *		AddNewRelationType -
947  *
948  *		define a composite type corresponding to the new relation
949  * --------------------------------
950  */
951 static ObjectAddress
AddNewRelationType(const char * typeName,Oid typeNamespace,Oid new_rel_oid,char new_rel_kind,Oid ownerid,Oid new_row_type,Oid new_array_type)952 AddNewRelationType(const char *typeName,
953 				   Oid typeNamespace,
954 				   Oid new_rel_oid,
955 				   char new_rel_kind,
956 				   Oid ownerid,
957 				   Oid new_row_type,
958 				   Oid new_array_type)
959 {
960 	return
961 		TypeCreate(new_row_type,	/* optional predetermined OID */
962 				   typeName,	/* type name */
963 				   typeNamespace,	/* type namespace */
964 				   new_rel_oid, /* relation oid */
965 				   new_rel_kind,	/* relation kind */
966 				   ownerid,		/* owner's ID */
967 				   -1,			/* internal size (varlena) */
968 				   TYPTYPE_COMPOSITE,	/* type-type (composite) */
969 				   TYPCATEGORY_COMPOSITE,	/* type-category (ditto) */
970 				   false,		/* composite types are never preferred */
971 				   DEFAULT_TYPDELIM,	/* default array delimiter */
972 				   F_RECORD_IN, /* input procedure */
973 				   F_RECORD_OUT,	/* output procedure */
974 				   F_RECORD_RECV,	/* receive procedure */
975 				   F_RECORD_SEND,	/* send procedure */
976 				   InvalidOid,	/* typmodin procedure - none */
977 				   InvalidOid,	/* typmodout procedure - none */
978 				   InvalidOid,	/* analyze procedure - default */
979 				   InvalidOid,	/* array element type - irrelevant */
980 				   false,		/* this is not an array type */
981 				   new_array_type,	/* array type if any */
982 				   InvalidOid,	/* domain base type - irrelevant */
983 				   NULL,		/* default value - none */
984 				   NULL,		/* default binary representation */
985 				   false,		/* passed by reference */
986 				   'd',			/* alignment - must be the largest! */
987 				   'x',			/* fully TOASTable */
988 				   -1,			/* typmod */
989 				   0,			/* array dimensions for typBaseType */
990 				   false,		/* Type NOT NULL */
991 				   InvalidOid); /* rowtypes never have a collation */
992 }
993 
994 /* --------------------------------
995  *		heap_create_with_catalog
996  *
997  *		creates a new cataloged relation.  see comments above.
998  *
999  * Arguments:
1000  *	relname: name to give to new rel
1001  *	relnamespace: OID of namespace it goes in
1002  *	reltablespace: OID of tablespace it goes in
1003  *	relid: OID to assign to new rel, or InvalidOid to select a new OID
1004  *	reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
1005  *	reloftypeid: if a typed table, OID of underlying type; else InvalidOid
1006  *	ownerid: OID of new rel's owner
1007  *	tupdesc: tuple descriptor (source of column definitions)
1008  *	cooked_constraints: list of precooked check constraints and defaults
1009  *	relkind: relkind for new rel
1010  *	relpersistence: rel's persistence status (permanent, temp, or unlogged)
1011  *	shared_relation: TRUE if it's to be a shared relation
1012  *	mapped_relation: TRUE if the relation will use the relfilenode map
1013  *	oidislocal: TRUE if oid column (if any) should be marked attislocal
1014  *	oidinhcount: attinhcount to assign to oid column (if any)
1015  *	oncommit: ON COMMIT marking (only relevant if it's a temp table)
1016  *	reloptions: reloptions in Datum form, or (Datum) 0 if none
1017  *	use_user_acl: TRUE if should look for user-defined default permissions;
1018  *		if FALSE, relacl is always set NULL
1019  *	allow_system_table_mods: TRUE to allow creation in system namespaces
1020  *	is_internal: is this a system-generated catalog?
1021  *
1022  * Output parameters:
1023  *	typaddress: if not null, gets the object address of the new pg_type entry
1024  *
1025  * Returns the OID of the new relation
1026  * --------------------------------
1027  */
1028 Oid
heap_create_with_catalog(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid reltypeid,Oid reloftypeid,Oid ownerid,TupleDesc tupdesc,List * cooked_constraints,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,bool oidislocal,int oidinhcount,OnCommitAction oncommit,Datum reloptions,bool use_user_acl,bool allow_system_table_mods,bool is_internal,ObjectAddress * typaddress)1029 heap_create_with_catalog(const char *relname,
1030 						 Oid relnamespace,
1031 						 Oid reltablespace,
1032 						 Oid relid,
1033 						 Oid reltypeid,
1034 						 Oid reloftypeid,
1035 						 Oid ownerid,
1036 						 TupleDesc tupdesc,
1037 						 List *cooked_constraints,
1038 						 char relkind,
1039 						 char relpersistence,
1040 						 bool shared_relation,
1041 						 bool mapped_relation,
1042 						 bool oidislocal,
1043 						 int oidinhcount,
1044 						 OnCommitAction oncommit,
1045 						 Datum reloptions,
1046 						 bool use_user_acl,
1047 						 bool allow_system_table_mods,
1048 						 bool is_internal,
1049 						 ObjectAddress *typaddress)
1050 {
1051 	Relation	pg_class_desc;
1052 	Relation	new_rel_desc;
1053 	Acl		   *relacl;
1054 	Oid			existing_relid;
1055 	Oid			old_type_oid;
1056 	Oid			new_type_oid;
1057 	ObjectAddress new_type_addr;
1058 	Oid			new_array_oid = InvalidOid;
1059 
1060 	pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
1061 
1062 	/*
1063 	 * sanity checks
1064 	 */
1065 	Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1066 
1067 	CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
1068 
1069 	/*
1070 	 * This would fail later on anyway, if the relation already exists.  But
1071 	 * by catching it here we can emit a nicer error message.
1072 	 */
1073 	existing_relid = get_relname_relid(relname, relnamespace);
1074 	if (existing_relid != InvalidOid)
1075 		ereport(ERROR,
1076 				(errcode(ERRCODE_DUPLICATE_TABLE),
1077 				 errmsg("relation \"%s\" already exists", relname)));
1078 
1079 	/*
1080 	 * Since we are going to create a rowtype as well, also check for
1081 	 * collision with an existing type name.  If there is one and it's an
1082 	 * autogenerated array, we can rename it out of the way; otherwise we can
1083 	 * at least give a good error message.
1084 	 */
1085 	old_type_oid = GetSysCacheOid2(TYPENAMENSP,
1086 								   CStringGetDatum(relname),
1087 								   ObjectIdGetDatum(relnamespace));
1088 	if (OidIsValid(old_type_oid))
1089 	{
1090 		if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1091 			ereport(ERROR,
1092 					(errcode(ERRCODE_DUPLICATE_OBJECT),
1093 					 errmsg("type \"%s\" already exists", relname),
1094 					 errhint("A relation has an associated type of the same name, "
1095 							 "so you must use a name that doesn't conflict "
1096 							 "with any existing type.")));
1097 	}
1098 
1099 	/*
1100 	 * Shared relations must be in pg_global (last-ditch check)
1101 	 */
1102 	if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1103 		elog(ERROR, "shared relations must be placed in pg_global tablespace");
1104 
1105 	/*
1106 	 * Allocate an OID for the relation, unless we were told what to use.
1107 	 *
1108 	 * The OID will be the relfilenode as well, so make sure it doesn't
1109 	 * collide with either pg_class OIDs or existing physical files.
1110 	 */
1111 	if (!OidIsValid(relid))
1112 	{
1113 		/* Use binary-upgrade override for pg_class.oid/relfilenode? */
1114 		if (IsBinaryUpgrade &&
1115 			(relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1116 			 relkind == RELKIND_VIEW || relkind == RELKIND_MATVIEW ||
1117 			 relkind == RELKIND_COMPOSITE_TYPE || relkind == RELKIND_FOREIGN_TABLE ||
1118 			 relkind == RELKIND_PARTITIONED_TABLE))
1119 		{
1120 			if (!OidIsValid(binary_upgrade_next_heap_pg_class_oid))
1121 				ereport(ERROR,
1122 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1123 						 errmsg("pg_class heap OID value not set when in binary upgrade mode")));
1124 
1125 			relid = binary_upgrade_next_heap_pg_class_oid;
1126 			binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1127 		}
1128 		/* There might be no TOAST table, so we have to test for it. */
1129 		else if (IsBinaryUpgrade &&
1130 				 OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1131 				 relkind == RELKIND_TOASTVALUE)
1132 		{
1133 			relid = binary_upgrade_next_toast_pg_class_oid;
1134 			binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1135 		}
1136 		else
1137 			relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1138 									  relpersistence);
1139 	}
1140 
1141 	/*
1142 	 * Determine the relation's initial permissions.
1143 	 */
1144 	if (use_user_acl)
1145 	{
1146 		switch (relkind)
1147 		{
1148 			case RELKIND_RELATION:
1149 			case RELKIND_VIEW:
1150 			case RELKIND_MATVIEW:
1151 			case RELKIND_FOREIGN_TABLE:
1152 			case RELKIND_PARTITIONED_TABLE:
1153 				relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
1154 											  relnamespace);
1155 				break;
1156 			case RELKIND_SEQUENCE:
1157 				relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
1158 											  relnamespace);
1159 				break;
1160 			default:
1161 				relacl = NULL;
1162 				break;
1163 		}
1164 	}
1165 	else
1166 		relacl = NULL;
1167 
1168 	/*
1169 	 * Create the relcache entry (mostly dummy at this point) and the physical
1170 	 * disk file.  (If we fail further down, it's the smgr's responsibility to
1171 	 * remove the disk file again.)
1172 	 */
1173 	new_rel_desc = heap_create(relname,
1174 							   relnamespace,
1175 							   reltablespace,
1176 							   relid,
1177 							   InvalidOid,
1178 							   tupdesc,
1179 							   relkind,
1180 							   relpersistence,
1181 							   shared_relation,
1182 							   mapped_relation,
1183 							   allow_system_table_mods);
1184 
1185 	Assert(relid == RelationGetRelid(new_rel_desc));
1186 
1187 	/*
1188 	 * Decide whether to create an array type over the relation's rowtype. We
1189 	 * do not create any array types for system catalogs (ie, those made
1190 	 * during initdb). We do not create them where the use of a relation as
1191 	 * such is an implementation detail: toast tables, sequences and indexes.
1192 	 */
1193 	if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
1194 							  relkind == RELKIND_VIEW ||
1195 							  relkind == RELKIND_MATVIEW ||
1196 							  relkind == RELKIND_FOREIGN_TABLE ||
1197 							  relkind == RELKIND_COMPOSITE_TYPE ||
1198 							  relkind == RELKIND_PARTITIONED_TABLE))
1199 		new_array_oid = AssignTypeArrayOid();
1200 
1201 	/*
1202 	 * Since defining a relation also defines a complex type, we add a new
1203 	 * system type corresponding to the new relation.  The OID of the type can
1204 	 * be preselected by the caller, but if reltypeid is InvalidOid, we'll
1205 	 * generate a new OID for it.
1206 	 *
1207 	 * NOTE: we could get a unique-index failure here, in case someone else is
1208 	 * creating the same type name in parallel but hadn't committed yet when
1209 	 * we checked for a duplicate name above.
1210 	 */
1211 	new_type_addr = AddNewRelationType(relname,
1212 									   relnamespace,
1213 									   relid,
1214 									   relkind,
1215 									   ownerid,
1216 									   reltypeid,
1217 									   new_array_oid);
1218 	new_type_oid = new_type_addr.objectId;
1219 	if (typaddress)
1220 		*typaddress = new_type_addr;
1221 
1222 	/*
1223 	 * Now make the array type if wanted.
1224 	 */
1225 	if (OidIsValid(new_array_oid))
1226 	{
1227 		char	   *relarrayname;
1228 
1229 		relarrayname = makeArrayTypeName(relname, relnamespace);
1230 
1231 		TypeCreate(new_array_oid,	/* force the type's OID to this */
1232 				   relarrayname,	/* Array type name */
1233 				   relnamespace,	/* Same namespace as parent */
1234 				   InvalidOid,	/* Not composite, no relationOid */
1235 				   0,			/* relkind, also N/A here */
1236 				   ownerid,		/* owner's ID */
1237 				   -1,			/* Internal size (varlena) */
1238 				   TYPTYPE_BASE,	/* Not composite - typelem is */
1239 				   TYPCATEGORY_ARRAY,	/* type-category (array) */
1240 				   false,		/* array types are never preferred */
1241 				   DEFAULT_TYPDELIM,	/* default array delimiter */
1242 				   F_ARRAY_IN,	/* array input proc */
1243 				   F_ARRAY_OUT, /* array output proc */
1244 				   F_ARRAY_RECV,	/* array recv (bin) proc */
1245 				   F_ARRAY_SEND,	/* array send (bin) proc */
1246 				   InvalidOid,	/* typmodin procedure - none */
1247 				   InvalidOid,	/* typmodout procedure - none */
1248 				   F_ARRAY_TYPANALYZE,	/* array analyze procedure */
1249 				   new_type_oid,	/* array element type - the rowtype */
1250 				   true,		/* yes, this is an array type */
1251 				   InvalidOid,	/* this has no array type */
1252 				   InvalidOid,	/* domain base type - irrelevant */
1253 				   NULL,		/* default value - none */
1254 				   NULL,		/* default binary representation */
1255 				   false,		/* passed by reference */
1256 				   'd',			/* alignment - must be the largest! */
1257 				   'x',			/* fully TOASTable */
1258 				   -1,			/* typmod */
1259 				   0,			/* array dimensions for typBaseType */
1260 				   false,		/* Type NOT NULL */
1261 				   InvalidOid); /* rowtypes never have a collation */
1262 
1263 		pfree(relarrayname);
1264 	}
1265 
1266 	/*
1267 	 * now create an entry in pg_class for the relation.
1268 	 *
1269 	 * NOTE: we could get a unique-index failure here, in case someone else is
1270 	 * creating the same relation name in parallel but hadn't committed yet
1271 	 * when we checked for a duplicate name above.
1272 	 */
1273 	AddNewRelationTuple(pg_class_desc,
1274 						new_rel_desc,
1275 						relid,
1276 						new_type_oid,
1277 						reloftypeid,
1278 						ownerid,
1279 						relkind,
1280 						PointerGetDatum(relacl),
1281 						reloptions);
1282 
1283 	/*
1284 	 * now add tuples to pg_attribute for the attributes in our new relation.
1285 	 */
1286 	AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind,
1287 						  oidislocal, oidinhcount);
1288 
1289 	/*
1290 	 * Make a dependency link to force the relation to be deleted if its
1291 	 * namespace is.  Also make a dependency link to its owner, as well as
1292 	 * dependencies for any roles mentioned in the default ACL.
1293 	 *
1294 	 * For composite types, these dependencies are tracked for the pg_type
1295 	 * entry, so we needn't record them here.  Likewise, TOAST tables don't
1296 	 * need a namespace dependency (they live in a pinned namespace) nor an
1297 	 * owner dependency (they depend indirectly through the parent table), nor
1298 	 * should they have any ACL entries.  The same applies for extension
1299 	 * dependencies.
1300 	 *
1301 	 * Also, skip this in bootstrap mode, since we don't make dependencies
1302 	 * while bootstrapping.
1303 	 */
1304 	if (relkind != RELKIND_COMPOSITE_TYPE &&
1305 		relkind != RELKIND_TOASTVALUE &&
1306 		!IsBootstrapProcessingMode())
1307 	{
1308 		ObjectAddress myself,
1309 					referenced;
1310 
1311 		myself.classId = RelationRelationId;
1312 		myself.objectId = relid;
1313 		myself.objectSubId = 0;
1314 
1315 		referenced.classId = NamespaceRelationId;
1316 		referenced.objectId = relnamespace;
1317 		referenced.objectSubId = 0;
1318 		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1319 
1320 		recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1321 
1322 		recordDependencyOnNewAcl(RelationRelationId, relid, 0, ownerid, relacl);
1323 
1324 		recordDependencyOnCurrentExtension(&myself, false);
1325 
1326 		if (reloftypeid)
1327 		{
1328 			referenced.classId = TypeRelationId;
1329 			referenced.objectId = reloftypeid;
1330 			referenced.objectSubId = 0;
1331 			recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1332 		}
1333 	}
1334 
1335 	/* Post creation hook for new relation */
1336 	InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
1337 
1338 	/*
1339 	 * Store any supplied constraints and defaults.
1340 	 *
1341 	 * NB: this may do a CommandCounterIncrement and rebuild the relcache
1342 	 * entry, so the relation must be valid and self-consistent at this point.
1343 	 * In particular, there are not yet constraints and defaults anywhere.
1344 	 */
1345 	StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
1346 
1347 	/*
1348 	 * If there's a special on-commit action, remember it
1349 	 */
1350 	if (oncommit != ONCOMMIT_NOOP)
1351 		register_on_commit_action(relid, oncommit);
1352 
1353 	/*
1354 	 * Unlogged objects need an init fork, except for partitioned tables which
1355 	 * have no storage at all.
1356 	 */
1357 	if (relpersistence == RELPERSISTENCE_UNLOGGED &&
1358 		relkind != RELKIND_PARTITIONED_TABLE)
1359 		heap_create_init_fork(new_rel_desc);
1360 
1361 	/*
1362 	 * ok, the relation has been cataloged, so close our relations and return
1363 	 * the OID of the newly created relation.
1364 	 */
1365 	heap_close(new_rel_desc, NoLock);	/* do not unlock till end of xact */
1366 	heap_close(pg_class_desc, RowExclusiveLock);
1367 
1368 	return relid;
1369 }
1370 
1371 /*
1372  * Set up an init fork for an unlogged table so that it can be correctly
1373  * reinitialized on restart.  An immediate sync is required even if the
1374  * page has been logged, because the write did not go through
1375  * shared_buffers and therefore a concurrent checkpoint may have moved
1376  * the redo pointer past our xlog record.  Recovery may as well remove it
1377  * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
1378  * record. Therefore, logging is necessary even if wal_level=minimal.
1379  */
1380 void
heap_create_init_fork(Relation rel)1381 heap_create_init_fork(Relation rel)
1382 {
1383 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
1384 		   rel->rd_rel->relkind == RELKIND_MATVIEW ||
1385 		   rel->rd_rel->relkind == RELKIND_TOASTVALUE);
1386 	RelationOpenSmgr(rel);
1387 	smgrcreate(rel->rd_smgr, INIT_FORKNUM, false);
1388 	log_smgrcreate(&rel->rd_smgr->smgr_rnode.node, INIT_FORKNUM);
1389 	smgrimmedsync(rel->rd_smgr, INIT_FORKNUM);
1390 }
1391 
1392 /*
1393  *		RelationRemoveInheritance
1394  *
1395  * Formerly, this routine checked for child relations and aborted the
1396  * deletion if any were found.  Now we rely on the dependency mechanism
1397  * to check for or delete child relations.  By the time we get here,
1398  * there are no children and we need only remove any pg_inherits rows
1399  * linking this relation to its parent(s).
1400  */
1401 static void
RelationRemoveInheritance(Oid relid)1402 RelationRemoveInheritance(Oid relid)
1403 {
1404 	Relation	catalogRelation;
1405 	SysScanDesc scan;
1406 	ScanKeyData key;
1407 	HeapTuple	tuple;
1408 
1409 	catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
1410 
1411 	ScanKeyInit(&key,
1412 				Anum_pg_inherits_inhrelid,
1413 				BTEqualStrategyNumber, F_OIDEQ,
1414 				ObjectIdGetDatum(relid));
1415 
1416 	scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1417 							  NULL, 1, &key);
1418 
1419 	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1420 		CatalogTupleDelete(catalogRelation, &tuple->t_self);
1421 
1422 	systable_endscan(scan);
1423 	heap_close(catalogRelation, RowExclusiveLock);
1424 }
1425 
1426 /*
1427  *		DeleteRelationTuple
1428  *
1429  * Remove pg_class row for the given relid.
1430  *
1431  * Note: this is shared by relation deletion and index deletion.  It's
1432  * not intended for use anyplace else.
1433  */
1434 void
DeleteRelationTuple(Oid relid)1435 DeleteRelationTuple(Oid relid)
1436 {
1437 	Relation	pg_class_desc;
1438 	HeapTuple	tup;
1439 
1440 	/* Grab an appropriate lock on the pg_class relation */
1441 	pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
1442 
1443 	tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1444 	if (!HeapTupleIsValid(tup))
1445 		elog(ERROR, "cache lookup failed for relation %u", relid);
1446 
1447 	/* delete the relation tuple from pg_class, and finish up */
1448 	CatalogTupleDelete(pg_class_desc, &tup->t_self);
1449 
1450 	ReleaseSysCache(tup);
1451 
1452 	heap_close(pg_class_desc, RowExclusiveLock);
1453 }
1454 
1455 /*
1456  *		DeleteAttributeTuples
1457  *
1458  * Remove pg_attribute rows for the given relid.
1459  *
1460  * Note: this is shared by relation deletion and index deletion.  It's
1461  * not intended for use anyplace else.
1462  */
1463 void
DeleteAttributeTuples(Oid relid)1464 DeleteAttributeTuples(Oid relid)
1465 {
1466 	Relation	attrel;
1467 	SysScanDesc scan;
1468 	ScanKeyData key[1];
1469 	HeapTuple	atttup;
1470 
1471 	/* Grab an appropriate lock on the pg_attribute relation */
1472 	attrel = heap_open(AttributeRelationId, RowExclusiveLock);
1473 
1474 	/* Use the index to scan only attributes of the target relation */
1475 	ScanKeyInit(&key[0],
1476 				Anum_pg_attribute_attrelid,
1477 				BTEqualStrategyNumber, F_OIDEQ,
1478 				ObjectIdGetDatum(relid));
1479 
1480 	scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1481 							  NULL, 1, key);
1482 
1483 	/* Delete all the matching tuples */
1484 	while ((atttup = systable_getnext(scan)) != NULL)
1485 		CatalogTupleDelete(attrel, &atttup->t_self);
1486 
1487 	/* Clean up after the scan */
1488 	systable_endscan(scan);
1489 	heap_close(attrel, RowExclusiveLock);
1490 }
1491 
1492 /*
1493  *		DeleteSystemAttributeTuples
1494  *
1495  * Remove pg_attribute rows for system columns of the given relid.
1496  *
1497  * Note: this is only used when converting a table to a view.  Views don't
1498  * have system columns, so we should remove them from pg_attribute.
1499  */
1500 void
DeleteSystemAttributeTuples(Oid relid)1501 DeleteSystemAttributeTuples(Oid relid)
1502 {
1503 	Relation	attrel;
1504 	SysScanDesc scan;
1505 	ScanKeyData key[2];
1506 	HeapTuple	atttup;
1507 
1508 	/* Grab an appropriate lock on the pg_attribute relation */
1509 	attrel = heap_open(AttributeRelationId, RowExclusiveLock);
1510 
1511 	/* Use the index to scan only system attributes of the target relation */
1512 	ScanKeyInit(&key[0],
1513 				Anum_pg_attribute_attrelid,
1514 				BTEqualStrategyNumber, F_OIDEQ,
1515 				ObjectIdGetDatum(relid));
1516 	ScanKeyInit(&key[1],
1517 				Anum_pg_attribute_attnum,
1518 				BTLessEqualStrategyNumber, F_INT2LE,
1519 				Int16GetDatum(0));
1520 
1521 	scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1522 							  NULL, 2, key);
1523 
1524 	/* Delete all the matching tuples */
1525 	while ((atttup = systable_getnext(scan)) != NULL)
1526 		CatalogTupleDelete(attrel, &atttup->t_self);
1527 
1528 	/* Clean up after the scan */
1529 	systable_endscan(scan);
1530 	heap_close(attrel, RowExclusiveLock);
1531 }
1532 
1533 /*
1534  *		RemoveAttributeById
1535  *
1536  * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1537  * deleted in pg_attribute.  We also remove pg_statistic entries for it.
1538  * (Everything else needed, such as getting rid of any pg_attrdef entry,
1539  * is handled by dependency.c.)
1540  */
1541 void
RemoveAttributeById(Oid relid,AttrNumber attnum)1542 RemoveAttributeById(Oid relid, AttrNumber attnum)
1543 {
1544 	Relation	rel;
1545 	Relation	attr_rel;
1546 	HeapTuple	tuple;
1547 	Form_pg_attribute attStruct;
1548 	char		newattname[NAMEDATALEN];
1549 
1550 	/*
1551 	 * Grab an exclusive lock on the target table, which we will NOT release
1552 	 * until end of transaction.  (In the simple case where we are directly
1553 	 * dropping this column, AlterTableDropColumn already did this ... but
1554 	 * when cascading from a drop of some other object, we may not have any
1555 	 * lock.)
1556 	 */
1557 	rel = relation_open(relid, AccessExclusiveLock);
1558 
1559 	attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1560 
1561 	tuple = SearchSysCacheCopy2(ATTNUM,
1562 								ObjectIdGetDatum(relid),
1563 								Int16GetDatum(attnum));
1564 	if (!HeapTupleIsValid(tuple))	/* shouldn't happen */
1565 		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1566 			 attnum, relid);
1567 	attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1568 
1569 	if (attnum < 0)
1570 	{
1571 		/* System attribute (probably OID) ... just delete the row */
1572 
1573 		CatalogTupleDelete(attr_rel, &tuple->t_self);
1574 	}
1575 	else
1576 	{
1577 		/* Dropping user attributes is lots harder */
1578 
1579 		/* Mark the attribute as dropped */
1580 		attStruct->attisdropped = true;
1581 
1582 		/*
1583 		 * Set the type OID to invalid.  A dropped attribute's type link
1584 		 * cannot be relied on (once the attribute is dropped, the type might
1585 		 * be too). Fortunately we do not need the type row --- the only
1586 		 * really essential information is the type's typlen and typalign,
1587 		 * which are preserved in the attribute's attlen and attalign.  We set
1588 		 * atttypid to zero here as a means of catching code that incorrectly
1589 		 * expects it to be valid.
1590 		 */
1591 		attStruct->atttypid = InvalidOid;
1592 
1593 		/* Remove any NOT NULL constraint the column may have */
1594 		attStruct->attnotnull = false;
1595 
1596 		/* We don't want to keep stats for it anymore */
1597 		attStruct->attstattarget = 0;
1598 
1599 		/*
1600 		 * Change the column name to something that isn't likely to conflict
1601 		 */
1602 		snprintf(newattname, sizeof(newattname),
1603 				 "........pg.dropped.%d........", attnum);
1604 		namestrcpy(&(attStruct->attname), newattname);
1605 
1606 		CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1607 	}
1608 
1609 	/*
1610 	 * Because updating the pg_attribute row will trigger a relcache flush for
1611 	 * the target relation, we need not do anything else to notify other
1612 	 * backends of the change.
1613 	 */
1614 
1615 	heap_close(attr_rel, RowExclusiveLock);
1616 
1617 	if (attnum > 0)
1618 		RemoveStatistics(relid, attnum);
1619 
1620 	relation_close(rel, NoLock);
1621 }
1622 
1623 /*
1624  *		RemoveAttrDefault
1625  *
1626  * If the specified relation/attribute has a default, remove it.
1627  * (If no default, raise error if complain is true, else return quietly.)
1628  */
1629 void
RemoveAttrDefault(Oid relid,AttrNumber attnum,DropBehavior behavior,bool complain,bool internal)1630 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1631 				  DropBehavior behavior, bool complain, bool internal)
1632 {
1633 	Relation	attrdef_rel;
1634 	ScanKeyData scankeys[2];
1635 	SysScanDesc scan;
1636 	HeapTuple	tuple;
1637 	bool		found = false;
1638 
1639 	attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1640 
1641 	ScanKeyInit(&scankeys[0],
1642 				Anum_pg_attrdef_adrelid,
1643 				BTEqualStrategyNumber, F_OIDEQ,
1644 				ObjectIdGetDatum(relid));
1645 	ScanKeyInit(&scankeys[1],
1646 				Anum_pg_attrdef_adnum,
1647 				BTEqualStrategyNumber, F_INT2EQ,
1648 				Int16GetDatum(attnum));
1649 
1650 	scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1651 							  NULL, 2, scankeys);
1652 
1653 	/* There should be at most one matching tuple, but we loop anyway */
1654 	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1655 	{
1656 		ObjectAddress object;
1657 
1658 		object.classId = AttrDefaultRelationId;
1659 		object.objectId = HeapTupleGetOid(tuple);
1660 		object.objectSubId = 0;
1661 
1662 		performDeletion(&object, behavior,
1663 						internal ? PERFORM_DELETION_INTERNAL : 0);
1664 
1665 		found = true;
1666 	}
1667 
1668 	systable_endscan(scan);
1669 	heap_close(attrdef_rel, RowExclusiveLock);
1670 
1671 	if (complain && !found)
1672 		elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1673 			 relid, attnum);
1674 }
1675 
1676 /*
1677  *		RemoveAttrDefaultById
1678  *
1679  * Remove a pg_attrdef entry specified by OID.  This is the guts of
1680  * attribute-default removal.  Note it should be called via performDeletion,
1681  * not directly.
1682  */
1683 void
RemoveAttrDefaultById(Oid attrdefId)1684 RemoveAttrDefaultById(Oid attrdefId)
1685 {
1686 	Relation	attrdef_rel;
1687 	Relation	attr_rel;
1688 	Relation	myrel;
1689 	ScanKeyData scankeys[1];
1690 	SysScanDesc scan;
1691 	HeapTuple	tuple;
1692 	Oid			myrelid;
1693 	AttrNumber	myattnum;
1694 
1695 	/* Grab an appropriate lock on the pg_attrdef relation */
1696 	attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1697 
1698 	/* Find the pg_attrdef tuple */
1699 	ScanKeyInit(&scankeys[0],
1700 				ObjectIdAttributeNumber,
1701 				BTEqualStrategyNumber, F_OIDEQ,
1702 				ObjectIdGetDatum(attrdefId));
1703 
1704 	scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1705 							  NULL, 1, scankeys);
1706 
1707 	tuple = systable_getnext(scan);
1708 	if (!HeapTupleIsValid(tuple))
1709 		elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1710 
1711 	myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1712 	myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1713 
1714 	/* Get an exclusive lock on the relation owning the attribute */
1715 	myrel = relation_open(myrelid, AccessExclusiveLock);
1716 
1717 	/* Now we can delete the pg_attrdef row */
1718 	CatalogTupleDelete(attrdef_rel, &tuple->t_self);
1719 
1720 	systable_endscan(scan);
1721 	heap_close(attrdef_rel, RowExclusiveLock);
1722 
1723 	/* Fix the pg_attribute row */
1724 	attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1725 
1726 	tuple = SearchSysCacheCopy2(ATTNUM,
1727 								ObjectIdGetDatum(myrelid),
1728 								Int16GetDatum(myattnum));
1729 	if (!HeapTupleIsValid(tuple))	/* shouldn't happen */
1730 		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1731 			 myattnum, myrelid);
1732 
1733 	((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1734 
1735 	CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1736 
1737 	/*
1738 	 * Our update of the pg_attribute row will force a relcache rebuild, so
1739 	 * there's nothing else to do here.
1740 	 */
1741 	heap_close(attr_rel, RowExclusiveLock);
1742 
1743 	/* Keep lock on attribute's rel until end of xact */
1744 	relation_close(myrel, NoLock);
1745 }
1746 
1747 /*
1748  * heap_drop_with_catalog	- removes specified relation from catalogs
1749  *
1750  * Note that this routine is not responsible for dropping objects that are
1751  * linked to the pg_class entry via dependencies (for example, indexes and
1752  * constraints).  Those are deleted by the dependency-tracing logic in
1753  * dependency.c before control gets here.  In general, therefore, this routine
1754  * should never be called directly; go through performDeletion() instead.
1755  */
1756 void
heap_drop_with_catalog(Oid relid)1757 heap_drop_with_catalog(Oid relid)
1758 {
1759 	Relation	rel;
1760 	HeapTuple	tuple;
1761 	Oid			parentOid = InvalidOid;
1762 
1763 	/*
1764 	 * To drop a partition safely, we must grab exclusive lock on its parent,
1765 	 * because another backend might be about to execute a query on the parent
1766 	 * table.  If it relies on previously cached partition descriptor, then it
1767 	 * could attempt to access the just-dropped relation as its partition. We
1768 	 * must therefore take a table lock strong enough to prevent all queries
1769 	 * on the table from proceeding until we commit and send out a
1770 	 * shared-cache-inval notice that will make them update their index lists.
1771 	 */
1772 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1773 	if (!HeapTupleIsValid(tuple))
1774 		elog(ERROR, "cache lookup failed for relation %u", relid);
1775 	if (((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1776 	{
1777 		parentOid = get_partition_parent(relid);
1778 		LockRelationOid(parentOid, AccessExclusiveLock);
1779 	}
1780 
1781 	ReleaseSysCache(tuple);
1782 
1783 	/*
1784 	 * Open and lock the relation.
1785 	 */
1786 	rel = relation_open(relid, AccessExclusiveLock);
1787 
1788 	/*
1789 	 * There can no longer be anyone *else* touching the relation, but we
1790 	 * might still have open queries or cursors, or pending trigger events, in
1791 	 * our own session.
1792 	 */
1793 	CheckTableNotInUse(rel, "DROP TABLE");
1794 
1795 	/*
1796 	 * This effectively deletes all rows in the table, and may be done in a
1797 	 * serializable transaction.  In that case we must record a rw-conflict in
1798 	 * to this transaction from each transaction holding a predicate lock on
1799 	 * the table.
1800 	 */
1801 	CheckTableForSerializableConflictIn(rel);
1802 
1803 	/*
1804 	 * Delete pg_foreign_table tuple first.
1805 	 */
1806 	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1807 	{
1808 		Relation	rel;
1809 		HeapTuple	tuple;
1810 
1811 		rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
1812 
1813 		tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1814 		if (!HeapTupleIsValid(tuple))
1815 			elog(ERROR, "cache lookup failed for foreign table %u", relid);
1816 
1817 		CatalogTupleDelete(rel, &tuple->t_self);
1818 
1819 		ReleaseSysCache(tuple);
1820 		heap_close(rel, RowExclusiveLock);
1821 	}
1822 
1823 	/*
1824 	 * If a partitioned table, delete the pg_partitioned_table tuple.
1825 	 */
1826 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1827 		RemovePartitionKeyByRelId(relid);
1828 
1829 	/*
1830 	 * Schedule unlinking of the relation's physical files at commit.
1831 	 */
1832 	if (rel->rd_rel->relkind != RELKIND_VIEW &&
1833 		rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
1834 		rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
1835 		rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1836 	{
1837 		RelationDropStorage(rel);
1838 	}
1839 
1840 	/*
1841 	 * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1842 	 * until transaction commit.  This ensures no one else will try to do
1843 	 * something with the doomed relation.
1844 	 */
1845 	relation_close(rel, NoLock);
1846 
1847 	/*
1848 	 * Remove any associated relation synchronization states.
1849 	 */
1850 	RemoveSubscriptionRel(InvalidOid, relid);
1851 
1852 	/*
1853 	 * Forget any ON COMMIT action for the rel
1854 	 */
1855 	remove_on_commit_action(relid);
1856 
1857 	/*
1858 	 * Flush the relation from the relcache.  We want to do this before
1859 	 * starting to remove catalog entries, just to be certain that no relcache
1860 	 * entry rebuild will happen partway through.  (That should not really
1861 	 * matter, since we don't do CommandCounterIncrement here, but let's be
1862 	 * safe.)
1863 	 */
1864 	RelationForgetRelation(relid);
1865 
1866 	/*
1867 	 * remove inheritance information
1868 	 */
1869 	RelationRemoveInheritance(relid);
1870 
1871 	/*
1872 	 * delete statistics
1873 	 */
1874 	RemoveStatistics(relid, 0);
1875 
1876 	/*
1877 	 * delete attribute tuples
1878 	 */
1879 	DeleteAttributeTuples(relid);
1880 
1881 	/*
1882 	 * delete relation tuple
1883 	 */
1884 	DeleteRelationTuple(relid);
1885 
1886 	if (OidIsValid(parentOid))
1887 	{
1888 		/*
1889 		 * Invalidate the parent's relcache so that the partition is no longer
1890 		 * included in its partition descriptor.
1891 		 */
1892 		CacheInvalidateRelcacheByRelid(parentOid);
1893 		/* keep the lock */
1894 	}
1895 }
1896 
1897 
1898 /*
1899  * Store a default expression for column attnum of relation rel.
1900  *
1901  * Returns the OID of the new pg_attrdef tuple.
1902  */
1903 Oid
StoreAttrDefault(Relation rel,AttrNumber attnum,Node * expr,bool is_internal)1904 StoreAttrDefault(Relation rel, AttrNumber attnum,
1905 				 Node *expr, bool is_internal)
1906 {
1907 	char	   *adbin;
1908 	char	   *adsrc;
1909 	Relation	adrel;
1910 	HeapTuple	tuple;
1911 	Datum		values[4];
1912 	static bool nulls[4] = {false, false, false, false};
1913 	Relation	attrrel;
1914 	HeapTuple	atttup;
1915 	Form_pg_attribute attStruct;
1916 	Oid			attrdefOid;
1917 	ObjectAddress colobject,
1918 				defobject;
1919 
1920 	/*
1921 	 * Flatten expression to string form for storage.
1922 	 */
1923 	adbin = nodeToString(expr);
1924 
1925 	/*
1926 	 * Also deparse it to form the mostly-obsolete adsrc field.
1927 	 */
1928 	adsrc = deparse_expression(expr,
1929 							   deparse_context_for(RelationGetRelationName(rel),
1930 												   RelationGetRelid(rel)),
1931 							   false, false);
1932 
1933 	/*
1934 	 * Make the pg_attrdef entry.
1935 	 */
1936 	values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
1937 	values[Anum_pg_attrdef_adnum - 1] = attnum;
1938 	values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
1939 	values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
1940 
1941 	adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1942 
1943 	tuple = heap_form_tuple(adrel->rd_att, values, nulls);
1944 	attrdefOid = CatalogTupleInsert(adrel, tuple);
1945 
1946 	defobject.classId = AttrDefaultRelationId;
1947 	defobject.objectId = attrdefOid;
1948 	defobject.objectSubId = 0;
1949 
1950 	heap_close(adrel, RowExclusiveLock);
1951 
1952 	/* now can free some of the stuff allocated above */
1953 	pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
1954 	pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
1955 	heap_freetuple(tuple);
1956 	pfree(adbin);
1957 	pfree(adsrc);
1958 
1959 	/*
1960 	 * Update the pg_attribute entry for the column to show that a default
1961 	 * exists.
1962 	 */
1963 	attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
1964 	atttup = SearchSysCacheCopy2(ATTNUM,
1965 								 ObjectIdGetDatum(RelationGetRelid(rel)),
1966 								 Int16GetDatum(attnum));
1967 	if (!HeapTupleIsValid(atttup))
1968 		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1969 			 attnum, RelationGetRelid(rel));
1970 	attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
1971 	if (!attStruct->atthasdef)
1972 	{
1973 		attStruct->atthasdef = true;
1974 		CatalogTupleUpdate(attrrel, &atttup->t_self, atttup);
1975 	}
1976 	heap_close(attrrel, RowExclusiveLock);
1977 	heap_freetuple(atttup);
1978 
1979 	/*
1980 	 * Make a dependency so that the pg_attrdef entry goes away if the column
1981 	 * (or whole table) is deleted.
1982 	 */
1983 	colobject.classId = RelationRelationId;
1984 	colobject.objectId = RelationGetRelid(rel);
1985 	colobject.objectSubId = attnum;
1986 
1987 	recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
1988 
1989 	/*
1990 	 * Record dependencies on objects used in the expression, too.
1991 	 */
1992 	recordDependencyOnExpr(&defobject, expr, NIL, DEPENDENCY_NORMAL);
1993 
1994 	/*
1995 	 * Post creation hook for attribute defaults.
1996 	 *
1997 	 * XXX. ALTER TABLE ALTER COLUMN SET/DROP DEFAULT is implemented with a
1998 	 * couple of deletion/creation of the attribute's default entry, so the
1999 	 * callee should check existence of an older version of this entry if it
2000 	 * needs to distinguish.
2001 	 */
2002 	InvokeObjectPostCreateHookArg(AttrDefaultRelationId,
2003 								  RelationGetRelid(rel), attnum, is_internal);
2004 
2005 	return attrdefOid;
2006 }
2007 
2008 /*
2009  * Store a check-constraint expression for the given relation.
2010  *
2011  * Caller is responsible for updating the count of constraints
2012  * in the pg_class entry for the relation.
2013  *
2014  * The OID of the new constraint is returned.
2015  */
2016 static Oid
StoreRelCheck(Relation rel,char * ccname,Node * expr,bool is_validated,bool is_local,int inhcount,bool is_no_inherit,bool is_internal)2017 StoreRelCheck(Relation rel, char *ccname, Node *expr,
2018 			  bool is_validated, bool is_local, int inhcount,
2019 			  bool is_no_inherit, bool is_internal)
2020 {
2021 	char	   *ccbin;
2022 	char	   *ccsrc;
2023 	List	   *varList;
2024 	int			keycount;
2025 	int16	   *attNos;
2026 	Oid			constrOid;
2027 
2028 	/*
2029 	 * Flatten expression to string form for storage.
2030 	 */
2031 	ccbin = nodeToString(expr);
2032 
2033 	/*
2034 	 * Also deparse it to form the mostly-obsolete consrc field.
2035 	 */
2036 	ccsrc = deparse_expression(expr,
2037 							   deparse_context_for(RelationGetRelationName(rel),
2038 												   RelationGetRelid(rel)),
2039 							   false, false);
2040 
2041 	/*
2042 	 * Find columns of rel that are used in expr
2043 	 *
2044 	 * NB: pull_var_clause is okay here only because we don't allow subselects
2045 	 * in check constraints; it would fail to examine the contents of
2046 	 * subselects.
2047 	 */
2048 	varList = pull_var_clause(expr, 0);
2049 	keycount = list_length(varList);
2050 
2051 	if (keycount > 0)
2052 	{
2053 		ListCell   *vl;
2054 		int			i = 0;
2055 
2056 		attNos = (int16 *) palloc(keycount * sizeof(int16));
2057 		foreach(vl, varList)
2058 		{
2059 			Var		   *var = (Var *) lfirst(vl);
2060 			int			j;
2061 
2062 			for (j = 0; j < i; j++)
2063 				if (attNos[j] == var->varattno)
2064 					break;
2065 			if (j == i)
2066 				attNos[i++] = var->varattno;
2067 		}
2068 		keycount = i;
2069 	}
2070 	else
2071 		attNos = NULL;
2072 
2073 	/*
2074 	 * Partitioned tables do not contain any rows themselves, so a NO INHERIT
2075 	 * constraint makes no sense.
2076 	 */
2077 	if (is_no_inherit &&
2078 		rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2079 		ereport(ERROR,
2080 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2081 				 errmsg("cannot add NO INHERIT constraint to partitioned table \"%s\"",
2082 						RelationGetRelationName(rel))));
2083 
2084 	/*
2085 	 * Create the Check Constraint
2086 	 */
2087 	constrOid =
2088 		CreateConstraintEntry(ccname,	/* Constraint Name */
2089 							  RelationGetNamespace(rel),	/* namespace */
2090 							  CONSTRAINT_CHECK, /* Constraint Type */
2091 							  false,	/* Is Deferrable */
2092 							  false,	/* Is Deferred */
2093 							  is_validated,
2094 							  RelationGetRelid(rel),	/* relation */
2095 							  attNos,	/* attrs in the constraint */
2096 							  keycount, /* # attrs in the constraint */
2097 							  InvalidOid,	/* not a domain constraint */
2098 							  InvalidOid,	/* no associated index */
2099 							  InvalidOid,	/* Foreign key fields */
2100 							  NULL,
2101 							  NULL,
2102 							  NULL,
2103 							  NULL,
2104 							  0,
2105 							  ' ',
2106 							  ' ',
2107 							  ' ',
2108 							  NULL, /* not an exclusion constraint */
2109 							  expr, /* Tree form of check constraint */
2110 							  ccbin,	/* Binary form of check constraint */
2111 							  ccsrc,	/* Source form of check constraint */
2112 							  is_local, /* conislocal */
2113 							  inhcount, /* coninhcount */
2114 							  is_no_inherit,	/* connoinherit */
2115 							  is_internal); /* internally constructed? */
2116 
2117 	pfree(ccbin);
2118 	pfree(ccsrc);
2119 
2120 	return constrOid;
2121 }
2122 
2123 /*
2124  * Store defaults and constraints (passed as a list of CookedConstraint).
2125  *
2126  * Each CookedConstraint struct is modified to store the new catalog tuple OID.
2127  *
2128  * NOTE: only pre-cooked expressions will be passed this way, which is to
2129  * say expressions inherited from an existing relation.  Newly parsed
2130  * expressions can be added later, by direct calls to StoreAttrDefault
2131  * and StoreRelCheck (see AddRelationNewConstraints()).
2132  */
2133 static void
StoreConstraints(Relation rel,List * cooked_constraints,bool is_internal)2134 StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
2135 {
2136 	int			numchecks = 0;
2137 	ListCell   *lc;
2138 
2139 	if (cooked_constraints == NIL)
2140 		return;					/* nothing to do */
2141 
2142 	/*
2143 	 * Deparsing of constraint expressions will fail unless the just-created
2144 	 * pg_attribute tuples for this relation are made visible.  So, bump the
2145 	 * command counter.  CAUTION: this will cause a relcache entry rebuild.
2146 	 */
2147 	CommandCounterIncrement();
2148 
2149 	foreach(lc, cooked_constraints)
2150 	{
2151 		CookedConstraint *con = (CookedConstraint *) lfirst(lc);
2152 
2153 		switch (con->contype)
2154 		{
2155 			case CONSTR_DEFAULT:
2156 				con->conoid = StoreAttrDefault(rel, con->attnum, con->expr,
2157 											   is_internal);
2158 				break;
2159 			case CONSTR_CHECK:
2160 				con->conoid =
2161 					StoreRelCheck(rel, con->name, con->expr,
2162 								  !con->skip_validation, con->is_local,
2163 								  con->inhcount, con->is_no_inherit,
2164 								  is_internal);
2165 				numchecks++;
2166 				break;
2167 			default:
2168 				elog(ERROR, "unrecognized constraint type: %d",
2169 					 (int) con->contype);
2170 		}
2171 	}
2172 
2173 	if (numchecks > 0)
2174 		SetRelationNumChecks(rel, numchecks);
2175 }
2176 
2177 /*
2178  * AddRelationNewConstraints
2179  *
2180  * Add new column default expressions and/or constraint check expressions
2181  * to an existing relation.  This is defined to do both for efficiency in
2182  * DefineRelation, but of course you can do just one or the other by passing
2183  * empty lists.
2184  *
2185  * rel: relation to be modified
2186  * newColDefaults: list of RawColumnDefault structures
2187  * newConstraints: list of Constraint nodes
2188  * allow_merge: TRUE if check constraints may be merged with existing ones
2189  * is_local: TRUE if definition is local, FALSE if it's inherited
2190  * is_internal: TRUE if result of some internal process, not a user request
2191  *
2192  * All entries in newColDefaults will be processed.  Entries in newConstraints
2193  * will be processed only if they are CONSTR_CHECK type.
2194  *
2195  * Returns a list of CookedConstraint nodes that shows the cooked form of
2196  * the default and constraint expressions added to the relation.
2197  *
2198  * NB: caller should have opened rel with AccessExclusiveLock, and should
2199  * hold that lock till end of transaction.  Also, we assume the caller has
2200  * done a CommandCounterIncrement if necessary to make the relation's catalog
2201  * tuples visible.
2202  */
2203 List *
AddRelationNewConstraints(Relation rel,List * newColDefaults,List * newConstraints,bool allow_merge,bool is_local,bool is_internal)2204 AddRelationNewConstraints(Relation rel,
2205 						  List *newColDefaults,
2206 						  List *newConstraints,
2207 						  bool allow_merge,
2208 						  bool is_local,
2209 						  bool is_internal)
2210 {
2211 	List	   *cookedConstraints = NIL;
2212 	TupleDesc	tupleDesc;
2213 	TupleConstr *oldconstr;
2214 	int			numoldchecks;
2215 	ParseState *pstate;
2216 	RangeTblEntry *rte;
2217 	int			numchecks;
2218 	List	   *checknames;
2219 	ListCell   *cell;
2220 	Node	   *expr;
2221 	CookedConstraint *cooked;
2222 
2223 	/*
2224 	 * Get info about existing constraints.
2225 	 */
2226 	tupleDesc = RelationGetDescr(rel);
2227 	oldconstr = tupleDesc->constr;
2228 	if (oldconstr)
2229 		numoldchecks = oldconstr->num_check;
2230 	else
2231 		numoldchecks = 0;
2232 
2233 	/*
2234 	 * Create a dummy ParseState and insert the target relation as its sole
2235 	 * rangetable entry.  We need a ParseState for transformExpr.
2236 	 */
2237 	pstate = make_parsestate(NULL);
2238 	rte = addRangeTableEntryForRelation(pstate,
2239 										rel,
2240 										NULL,
2241 										false,
2242 										true);
2243 	addRTEtoQuery(pstate, rte, true, true, true);
2244 
2245 	/*
2246 	 * Process column default expressions.
2247 	 */
2248 	foreach(cell, newColDefaults)
2249 	{
2250 		RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2251 		Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
2252 		Oid			defOid;
2253 
2254 		expr = cookDefault(pstate, colDef->raw_default,
2255 						   atp->atttypid, atp->atttypmod,
2256 						   NameStr(atp->attname));
2257 
2258 		/*
2259 		 * If the expression is just a NULL constant, we do not bother to make
2260 		 * an explicit pg_attrdef entry, since the default behavior is
2261 		 * equivalent.
2262 		 *
2263 		 * Note a nonobvious property of this test: if the column is of a
2264 		 * domain type, what we'll get is not a bare null Const but a
2265 		 * CoerceToDomain expr, so we will not discard the default.  This is
2266 		 * critical because the column default needs to be retained to
2267 		 * override any default that the domain might have.
2268 		 */
2269 		if (expr == NULL ||
2270 			(IsA(expr, Const) &&((Const *) expr)->constisnull))
2271 			continue;
2272 
2273 		defOid = StoreAttrDefault(rel, colDef->attnum, expr, is_internal);
2274 
2275 		cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2276 		cooked->contype = CONSTR_DEFAULT;
2277 		cooked->conoid = defOid;
2278 		cooked->name = NULL;
2279 		cooked->attnum = colDef->attnum;
2280 		cooked->expr = expr;
2281 		cooked->skip_validation = false;
2282 		cooked->is_local = is_local;
2283 		cooked->inhcount = is_local ? 0 : 1;
2284 		cooked->is_no_inherit = false;
2285 		cookedConstraints = lappend(cookedConstraints, cooked);
2286 	}
2287 
2288 	/*
2289 	 * Process constraint expressions.
2290 	 */
2291 	numchecks = numoldchecks;
2292 	checknames = NIL;
2293 	foreach(cell, newConstraints)
2294 	{
2295 		Constraint *cdef = (Constraint *) lfirst(cell);
2296 		char	   *ccname;
2297 		Oid			constrOid;
2298 
2299 		if (cdef->contype != CONSTR_CHECK)
2300 			continue;
2301 
2302 		if (cdef->raw_expr != NULL)
2303 		{
2304 			Assert(cdef->cooked_expr == NULL);
2305 
2306 			/*
2307 			 * Transform raw parsetree to executable expression, and verify
2308 			 * it's valid as a CHECK constraint.
2309 			 */
2310 			expr = cookConstraint(pstate, cdef->raw_expr,
2311 								  RelationGetRelationName(rel));
2312 		}
2313 		else
2314 		{
2315 			Assert(cdef->cooked_expr != NULL);
2316 
2317 			/*
2318 			 * Here, we assume the parser will only pass us valid CHECK
2319 			 * expressions, so we do no particular checking.
2320 			 */
2321 			expr = stringToNode(cdef->cooked_expr);
2322 		}
2323 
2324 		/*
2325 		 * Check name uniqueness, or generate a name if none was given.
2326 		 */
2327 		if (cdef->conname != NULL)
2328 		{
2329 			ListCell   *cell2;
2330 
2331 			ccname = cdef->conname;
2332 			/* Check against other new constraints */
2333 			/* Needed because we don't do CommandCounterIncrement in loop */
2334 			foreach(cell2, checknames)
2335 			{
2336 				if (strcmp((char *) lfirst(cell2), ccname) == 0)
2337 					ereport(ERROR,
2338 							(errcode(ERRCODE_DUPLICATE_OBJECT),
2339 							 errmsg("check constraint \"%s\" already exists",
2340 									ccname)));
2341 			}
2342 
2343 			/* save name for future checks */
2344 			checknames = lappend(checknames, ccname);
2345 
2346 			/*
2347 			 * Check against pre-existing constraints.  If we are allowed to
2348 			 * merge with an existing constraint, there's no more to do here.
2349 			 * (We omit the duplicate constraint from the result, which is
2350 			 * what ATAddCheckConstraint wants.)
2351 			 */
2352 			if (MergeWithExistingConstraint(rel, ccname, expr,
2353 											allow_merge, is_local,
2354 											cdef->initially_valid,
2355 											cdef->is_no_inherit))
2356 				continue;
2357 		}
2358 		else
2359 		{
2360 			/*
2361 			 * When generating a name, we want to create "tab_col_check" for a
2362 			 * column constraint and "tab_check" for a table constraint.  We
2363 			 * no longer have any info about the syntactic positioning of the
2364 			 * constraint phrase, so we approximate this by seeing whether the
2365 			 * expression references more than one column.  (If the user
2366 			 * played by the rules, the result is the same...)
2367 			 *
2368 			 * Note: pull_var_clause() doesn't descend into sublinks, but we
2369 			 * eliminated those above; and anyway this only needs to be an
2370 			 * approximate answer.
2371 			 */
2372 			List	   *vars;
2373 			char	   *colname;
2374 
2375 			vars = pull_var_clause(expr, 0);
2376 
2377 			/* eliminate duplicates */
2378 			vars = list_union(NIL, vars);
2379 
2380 			if (list_length(vars) == 1)
2381 				colname = get_attname(RelationGetRelid(rel),
2382 									  ((Var *) linitial(vars))->varattno);
2383 			else
2384 				colname = NULL;
2385 
2386 			ccname = ChooseConstraintName(RelationGetRelationName(rel),
2387 										  colname,
2388 										  "check",
2389 										  RelationGetNamespace(rel),
2390 										  checknames);
2391 
2392 			/* save name for future checks */
2393 			checknames = lappend(checknames, ccname);
2394 		}
2395 
2396 		/*
2397 		 * OK, store it.
2398 		 */
2399 		constrOid =
2400 			StoreRelCheck(rel, ccname, expr, cdef->initially_valid, is_local,
2401 						  is_local ? 0 : 1, cdef->is_no_inherit, is_internal);
2402 
2403 		numchecks++;
2404 
2405 		cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2406 		cooked->contype = CONSTR_CHECK;
2407 		cooked->conoid = constrOid;
2408 		cooked->name = ccname;
2409 		cooked->attnum = 0;
2410 		cooked->expr = expr;
2411 		cooked->skip_validation = cdef->skip_validation;
2412 		cooked->is_local = is_local;
2413 		cooked->inhcount = is_local ? 0 : 1;
2414 		cooked->is_no_inherit = cdef->is_no_inherit;
2415 		cookedConstraints = lappend(cookedConstraints, cooked);
2416 	}
2417 
2418 	/*
2419 	 * Update the count of constraints in the relation's pg_class tuple. We do
2420 	 * this even if there was no change, in order to ensure that an SI update
2421 	 * message is sent out for the pg_class tuple, which will force other
2422 	 * backends to rebuild their relcache entries for the rel. (This is
2423 	 * critical if we added defaults but not constraints.)
2424 	 */
2425 	SetRelationNumChecks(rel, numchecks);
2426 
2427 	return cookedConstraints;
2428 }
2429 
2430 /*
2431  * Check for a pre-existing check constraint that conflicts with a proposed
2432  * new one, and either adjust its conislocal/coninhcount settings or throw
2433  * error as needed.
2434  *
2435  * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
2436  * got a so-far-unique name, or throws error if conflict.
2437  *
2438  * XXX See MergeConstraintsIntoExisting too if you change this code.
2439  */
2440 static bool
MergeWithExistingConstraint(Relation rel,char * ccname,Node * expr,bool allow_merge,bool is_local,bool is_initially_valid,bool is_no_inherit)2441 MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
2442 							bool allow_merge, bool is_local,
2443 							bool is_initially_valid,
2444 							bool is_no_inherit)
2445 {
2446 	bool		found;
2447 	Relation	conDesc;
2448 	SysScanDesc conscan;
2449 	ScanKeyData skey[2];
2450 	HeapTuple	tup;
2451 
2452 	/* Search for a pg_constraint entry with same name and relation */
2453 	conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
2454 
2455 	found = false;
2456 
2457 	ScanKeyInit(&skey[0],
2458 				Anum_pg_constraint_conname,
2459 				BTEqualStrategyNumber, F_NAMEEQ,
2460 				CStringGetDatum(ccname));
2461 
2462 	ScanKeyInit(&skey[1],
2463 				Anum_pg_constraint_connamespace,
2464 				BTEqualStrategyNumber, F_OIDEQ,
2465 				ObjectIdGetDatum(RelationGetNamespace(rel)));
2466 
2467 	conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
2468 								 NULL, 2, skey);
2469 
2470 	while (HeapTupleIsValid(tup = systable_getnext(conscan)))
2471 	{
2472 		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2473 
2474 		if (con->conrelid == RelationGetRelid(rel))
2475 		{
2476 			/* Found it.  Conflicts if not identical check constraint */
2477 			if (con->contype == CONSTRAINT_CHECK)
2478 			{
2479 				Datum		val;
2480 				bool		isnull;
2481 
2482 				val = fastgetattr(tup,
2483 								  Anum_pg_constraint_conbin,
2484 								  conDesc->rd_att, &isnull);
2485 				if (isnull)
2486 					elog(ERROR, "null conbin for rel %s",
2487 						 RelationGetRelationName(rel));
2488 				if (equal(expr, stringToNode(TextDatumGetCString(val))))
2489 					found = true;
2490 			}
2491 
2492 			/*
2493 			 * If the existing constraint is purely inherited (no local
2494 			 * definition) then interpret addition of a local constraint as a
2495 			 * legal merge.  This allows ALTER ADD CONSTRAINT on parent and
2496 			 * child tables to be given in either order with same end state.
2497 			 * However if the relation is a partition, all inherited
2498 			 * constraints are always non-local, including those that were
2499 			 * merged.
2500 			 */
2501 			if (is_local && !con->conislocal && !rel->rd_rel->relispartition)
2502 				allow_merge = true;
2503 
2504 			if (!found || !allow_merge)
2505 				ereport(ERROR,
2506 						(errcode(ERRCODE_DUPLICATE_OBJECT),
2507 						 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2508 								ccname, RelationGetRelationName(rel))));
2509 
2510 			/* If the child constraint is "no inherit" then cannot merge */
2511 			if (con->connoinherit)
2512 				ereport(ERROR,
2513 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2514 						 errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
2515 								ccname, RelationGetRelationName(rel))));
2516 
2517 			/*
2518 			 * Must not change an existing inherited constraint to "no
2519 			 * inherit" status.  That's because inherited constraints should
2520 			 * be able to propagate to lower-level children.
2521 			 */
2522 			if (con->coninhcount > 0 && is_no_inherit)
2523 				ereport(ERROR,
2524 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2525 						 errmsg("constraint \"%s\" conflicts with inherited constraint on relation \"%s\"",
2526 								ccname, RelationGetRelationName(rel))));
2527 
2528 			/*
2529 			 * If the child constraint is "not valid" then cannot merge with a
2530 			 * valid parent constraint
2531 			 */
2532 			if (is_initially_valid && !con->convalidated)
2533 				ereport(ERROR,
2534 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2535 						 errmsg("constraint \"%s\" conflicts with NOT VALID constraint on relation \"%s\"",
2536 								ccname, RelationGetRelationName(rel))));
2537 
2538 			/* OK to update the tuple */
2539 			ereport(NOTICE,
2540 					(errmsg("merging constraint \"%s\" with inherited definition",
2541 							ccname)));
2542 
2543 			tup = heap_copytuple(tup);
2544 			con = (Form_pg_constraint) GETSTRUCT(tup);
2545 
2546 			/*
2547 			 * In case of partitions, an inherited constraint must be
2548 			 * inherited only once since it cannot have multiple parents and
2549 			 * it is never considered local.
2550 			 */
2551 			if (rel->rd_rel->relispartition)
2552 			{
2553 				con->coninhcount = 1;
2554 				con->conislocal = false;
2555 			}
2556 			else
2557 			{
2558 				if (is_local)
2559 					con->conislocal = true;
2560 				else
2561 					con->coninhcount++;
2562 			}
2563 
2564 			if (is_no_inherit)
2565 			{
2566 				Assert(is_local);
2567 				con->connoinherit = true;
2568 			}
2569 			CatalogTupleUpdate(conDesc, &tup->t_self, tup);
2570 			break;
2571 		}
2572 	}
2573 
2574 	systable_endscan(conscan);
2575 	heap_close(conDesc, RowExclusiveLock);
2576 
2577 	return found;
2578 }
2579 
2580 /*
2581  * Update the count of constraints in the relation's pg_class tuple.
2582  *
2583  * Caller had better hold exclusive lock on the relation.
2584  *
2585  * An important side effect is that a SI update message will be sent out for
2586  * the pg_class tuple, which will force other backends to rebuild their
2587  * relcache entries for the rel.  Also, this backend will rebuild its
2588  * own relcache entry at the next CommandCounterIncrement.
2589  */
2590 static void
SetRelationNumChecks(Relation rel,int numchecks)2591 SetRelationNumChecks(Relation rel, int numchecks)
2592 {
2593 	Relation	relrel;
2594 	HeapTuple	reltup;
2595 	Form_pg_class relStruct;
2596 
2597 	relrel = heap_open(RelationRelationId, RowExclusiveLock);
2598 	reltup = SearchSysCacheCopy1(RELOID,
2599 								 ObjectIdGetDatum(RelationGetRelid(rel)));
2600 	if (!HeapTupleIsValid(reltup))
2601 		elog(ERROR, "cache lookup failed for relation %u",
2602 			 RelationGetRelid(rel));
2603 	relStruct = (Form_pg_class) GETSTRUCT(reltup);
2604 
2605 	if (relStruct->relchecks != numchecks)
2606 	{
2607 		relStruct->relchecks = numchecks;
2608 
2609 		CatalogTupleUpdate(relrel, &reltup->t_self, reltup);
2610 	}
2611 	else
2612 	{
2613 		/* Skip the disk update, but force relcache inval anyway */
2614 		CacheInvalidateRelcache(rel);
2615 	}
2616 
2617 	heap_freetuple(reltup);
2618 	heap_close(relrel, RowExclusiveLock);
2619 }
2620 
2621 /*
2622  * Take a raw default and convert it to a cooked format ready for
2623  * storage.
2624  *
2625  * Parse state should be set up to recognize any vars that might appear
2626  * in the expression.  (Even though we plan to reject vars, it's more
2627  * user-friendly to give the correct error message than "unknown var".)
2628  *
2629  * If atttypid is not InvalidOid, coerce the expression to the specified
2630  * type (and typmod atttypmod).   attname is only needed in this case:
2631  * it is used in the error message, if any.
2632  */
2633 Node *
cookDefault(ParseState * pstate,Node * raw_default,Oid atttypid,int32 atttypmod,char * attname)2634 cookDefault(ParseState *pstate,
2635 			Node *raw_default,
2636 			Oid atttypid,
2637 			int32 atttypmod,
2638 			char *attname)
2639 {
2640 	Node	   *expr;
2641 
2642 	Assert(raw_default != NULL);
2643 
2644 	/*
2645 	 * Transform raw parsetree to executable expression.
2646 	 */
2647 	expr = transformExpr(pstate, raw_default, EXPR_KIND_COLUMN_DEFAULT);
2648 
2649 	/*
2650 	 * Make sure default expr does not refer to any vars (we need this check
2651 	 * since the pstate includes the target table).
2652 	 */
2653 	if (contain_var_clause(expr))
2654 		ereport(ERROR,
2655 				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2656 				 errmsg("cannot use column references in default expression")));
2657 
2658 	/*
2659 	 * transformExpr() should have already rejected subqueries, aggregates,
2660 	 * window functions, and SRFs, based on the EXPR_KIND_ for a default
2661 	 * expression.
2662 	 */
2663 
2664 	/*
2665 	 * Coerce the expression to the correct type and typmod, if given. This
2666 	 * should match the parser's processing of non-defaulted expressions ---
2667 	 * see transformAssignedExpr().
2668 	 */
2669 	if (OidIsValid(atttypid))
2670 	{
2671 		Oid			type_id = exprType(expr);
2672 
2673 		expr = coerce_to_target_type(pstate, expr, type_id,
2674 									 atttypid, atttypmod,
2675 									 COERCION_ASSIGNMENT,
2676 									 COERCE_IMPLICIT_CAST,
2677 									 -1);
2678 		if (expr == NULL)
2679 			ereport(ERROR,
2680 					(errcode(ERRCODE_DATATYPE_MISMATCH),
2681 					 errmsg("column \"%s\" is of type %s"
2682 							" but default expression is of type %s",
2683 							attname,
2684 							format_type_be(atttypid),
2685 							format_type_be(type_id)),
2686 					 errhint("You will need to rewrite or cast the expression.")));
2687 	}
2688 
2689 	/*
2690 	 * Finally, take care of collations in the finished expression.
2691 	 */
2692 	assign_expr_collations(pstate, expr);
2693 
2694 	return expr;
2695 }
2696 
2697 /*
2698  * Take a raw CHECK constraint expression and convert it to a cooked format
2699  * ready for storage.
2700  *
2701  * Parse state must be set up to recognize any vars that might appear
2702  * in the expression.
2703  */
2704 static Node *
cookConstraint(ParseState * pstate,Node * raw_constraint,char * relname)2705 cookConstraint(ParseState *pstate,
2706 			   Node *raw_constraint,
2707 			   char *relname)
2708 {
2709 	Node	   *expr;
2710 
2711 	/*
2712 	 * Transform raw parsetree to executable expression.
2713 	 */
2714 	expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
2715 
2716 	/*
2717 	 * Make sure it yields a boolean result.
2718 	 */
2719 	expr = coerce_to_boolean(pstate, expr, "CHECK");
2720 
2721 	/*
2722 	 * Take care of collations.
2723 	 */
2724 	assign_expr_collations(pstate, expr);
2725 
2726 	/*
2727 	 * Make sure no outside relations are referred to (this is probably dead
2728 	 * code now that add_missing_from is history).
2729 	 */
2730 	if (list_length(pstate->p_rtable) != 1)
2731 		ereport(ERROR,
2732 				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2733 				 errmsg("only table \"%s\" can be referenced in check constraint",
2734 						relname)));
2735 
2736 	return expr;
2737 }
2738 
2739 
2740 /*
2741  * RemoveStatistics --- remove entries in pg_statistic for a rel or column
2742  *
2743  * If attnum is zero, remove all entries for rel; else remove only the one(s)
2744  * for that column.
2745  */
2746 void
RemoveStatistics(Oid relid,AttrNumber attnum)2747 RemoveStatistics(Oid relid, AttrNumber attnum)
2748 {
2749 	Relation	pgstatistic;
2750 	SysScanDesc scan;
2751 	ScanKeyData key[2];
2752 	int			nkeys;
2753 	HeapTuple	tuple;
2754 
2755 	pgstatistic = heap_open(StatisticRelationId, RowExclusiveLock);
2756 
2757 	ScanKeyInit(&key[0],
2758 				Anum_pg_statistic_starelid,
2759 				BTEqualStrategyNumber, F_OIDEQ,
2760 				ObjectIdGetDatum(relid));
2761 
2762 	if (attnum == 0)
2763 		nkeys = 1;
2764 	else
2765 	{
2766 		ScanKeyInit(&key[1],
2767 					Anum_pg_statistic_staattnum,
2768 					BTEqualStrategyNumber, F_INT2EQ,
2769 					Int16GetDatum(attnum));
2770 		nkeys = 2;
2771 	}
2772 
2773 	scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
2774 							  NULL, nkeys, key);
2775 
2776 	/* we must loop even when attnum != 0, in case of inherited stats */
2777 	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
2778 		CatalogTupleDelete(pgstatistic, &tuple->t_self);
2779 
2780 	systable_endscan(scan);
2781 
2782 	heap_close(pgstatistic, RowExclusiveLock);
2783 }
2784 
2785 
2786 /*
2787  * RelationTruncateIndexes - truncate all indexes associated
2788  * with the heap relation to zero tuples.
2789  *
2790  * The routine will truncate and then reconstruct the indexes on
2791  * the specified relation.  Caller must hold exclusive lock on rel.
2792  */
2793 static void
RelationTruncateIndexes(Relation heapRelation)2794 RelationTruncateIndexes(Relation heapRelation)
2795 {
2796 	ListCell   *indlist;
2797 
2798 	/* Ask the relcache to produce a list of the indexes of the rel */
2799 	foreach(indlist, RelationGetIndexList(heapRelation))
2800 	{
2801 		Oid			indexId = lfirst_oid(indlist);
2802 		Relation	currentIndex;
2803 		IndexInfo  *indexInfo;
2804 
2805 		/* Open the index relation; use exclusive lock, just to be sure */
2806 		currentIndex = index_open(indexId, AccessExclusiveLock);
2807 
2808 		/*
2809 		 * Fetch info needed for index_build.  Since we know there are no
2810 		 * tuples that actually need indexing, we can use a dummy IndexInfo.
2811 		 * This is slightly cheaper to build, but the real point is to avoid
2812 		 * possibly running user-defined code in index expressions or
2813 		 * predicates.  We might be getting invoked during ON COMMIT
2814 		 * processing, and we don't want to run any such code then.
2815 		 */
2816 		indexInfo = BuildDummyIndexInfo(currentIndex);
2817 
2818 		/*
2819 		 * Now truncate the actual file (and discard buffers).
2820 		 */
2821 		RelationTruncate(currentIndex, 0);
2822 
2823 		/* Initialize the index and rebuild */
2824 		/* Note: we do not need to re-establish pkey setting */
2825 		index_build(heapRelation, currentIndex, indexInfo, false, true);
2826 
2827 		/* We're done with this index */
2828 		index_close(currentIndex, NoLock);
2829 	}
2830 }
2831 
2832 /*
2833  *	 heap_truncate
2834  *
2835  *	 This routine deletes all data within all the specified relations.
2836  *
2837  * This is not transaction-safe!  There is another, transaction-safe
2838  * implementation in commands/tablecmds.c.  We now use this only for
2839  * ON COMMIT truncation of temporary tables, where it doesn't matter.
2840  */
2841 void
heap_truncate(List * relids)2842 heap_truncate(List *relids)
2843 {
2844 	List	   *relations = NIL;
2845 	ListCell   *cell;
2846 
2847 	/* Open relations for processing, and grab exclusive access on each */
2848 	foreach(cell, relids)
2849 	{
2850 		Oid			rid = lfirst_oid(cell);
2851 		Relation	rel;
2852 
2853 		rel = heap_open(rid, AccessExclusiveLock);
2854 		relations = lappend(relations, rel);
2855 	}
2856 
2857 	/* Don't allow truncate on tables that are referenced by foreign keys */
2858 	heap_truncate_check_FKs(relations, true);
2859 
2860 	/* OK to do it */
2861 	foreach(cell, relations)
2862 	{
2863 		Relation	rel = lfirst(cell);
2864 
2865 		/* Truncate the relation */
2866 		heap_truncate_one_rel(rel);
2867 
2868 		/* Close the relation, but keep exclusive lock on it until commit */
2869 		heap_close(rel, NoLock);
2870 	}
2871 }
2872 
2873 /*
2874  *	 heap_truncate_one_rel
2875  *
2876  *	 This routine deletes all data within the specified relation.
2877  *
2878  * This is not transaction-safe, because the truncation is done immediately
2879  * and cannot be rolled back later.  Caller is responsible for having
2880  * checked permissions etc, and must have obtained AccessExclusiveLock.
2881  */
2882 void
heap_truncate_one_rel(Relation rel)2883 heap_truncate_one_rel(Relation rel)
2884 {
2885 	Oid			toastrelid;
2886 
2887 	/*
2888 	 * Truncate the relation.  Partitioned tables have no storage, so there is
2889 	 * nothing to do for them here.
2890 	 */
2891 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2892 		return;
2893 
2894 	/* Truncate the actual file (and discard buffers) */
2895 	RelationTruncate(rel, 0);
2896 
2897 	/* If the relation has indexes, truncate the indexes too */
2898 	RelationTruncateIndexes(rel);
2899 
2900 	/* If there is a toast table, truncate that too */
2901 	toastrelid = rel->rd_rel->reltoastrelid;
2902 	if (OidIsValid(toastrelid))
2903 	{
2904 		Relation	toastrel = heap_open(toastrelid, AccessExclusiveLock);
2905 
2906 		RelationTruncate(toastrel, 0);
2907 		RelationTruncateIndexes(toastrel);
2908 		/* keep the lock... */
2909 		heap_close(toastrel, NoLock);
2910 	}
2911 }
2912 
2913 /*
2914  * heap_truncate_check_FKs
2915  *		Check for foreign keys referencing a list of relations that
2916  *		are to be truncated, and raise error if there are any
2917  *
2918  * We disallow such FKs (except self-referential ones) since the whole point
2919  * of TRUNCATE is to not scan the individual rows to be thrown away.
2920  *
2921  * This is split out so it can be shared by both implementations of truncate.
2922  * Caller should already hold a suitable lock on the relations.
2923  *
2924  * tempTables is only used to select an appropriate error message.
2925  */
2926 void
heap_truncate_check_FKs(List * relations,bool tempTables)2927 heap_truncate_check_FKs(List *relations, bool tempTables)
2928 {
2929 	List	   *oids = NIL;
2930 	List	   *dependents;
2931 	ListCell   *cell;
2932 
2933 	/*
2934 	 * Build a list of OIDs of the interesting relations.
2935 	 *
2936 	 * If a relation has no triggers, then it can neither have FKs nor be
2937 	 * referenced by a FK from another table, so we can ignore it.
2938 	 */
2939 	foreach(cell, relations)
2940 	{
2941 		Relation	rel = lfirst(cell);
2942 
2943 		if (rel->rd_rel->relhastriggers)
2944 			oids = lappend_oid(oids, RelationGetRelid(rel));
2945 	}
2946 
2947 	/*
2948 	 * Fast path: if no relation has triggers, none has FKs either.
2949 	 */
2950 	if (oids == NIL)
2951 		return;
2952 
2953 	/*
2954 	 * Otherwise, must scan pg_constraint.  We make one pass with all the
2955 	 * relations considered; if this finds nothing, then all is well.
2956 	 */
2957 	dependents = heap_truncate_find_FKs(oids);
2958 	if (dependents == NIL)
2959 		return;
2960 
2961 	/*
2962 	 * Otherwise we repeat the scan once per relation to identify a particular
2963 	 * pair of relations to complain about.  This is pretty slow, but
2964 	 * performance shouldn't matter much in a failure path.  The reason for
2965 	 * doing things this way is to ensure that the message produced is not
2966 	 * dependent on chance row locations within pg_constraint.
2967 	 */
2968 	foreach(cell, oids)
2969 	{
2970 		Oid			relid = lfirst_oid(cell);
2971 		ListCell   *cell2;
2972 
2973 		dependents = heap_truncate_find_FKs(list_make1_oid(relid));
2974 
2975 		foreach(cell2, dependents)
2976 		{
2977 			Oid			relid2 = lfirst_oid(cell2);
2978 
2979 			if (!list_member_oid(oids, relid2))
2980 			{
2981 				char	   *relname = get_rel_name(relid);
2982 				char	   *relname2 = get_rel_name(relid2);
2983 
2984 				if (tempTables)
2985 					ereport(ERROR,
2986 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2987 							 errmsg("unsupported ON COMMIT and foreign key combination"),
2988 							 errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
2989 									   relname2, relname)));
2990 				else
2991 					ereport(ERROR,
2992 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2993 							 errmsg("cannot truncate a table referenced in a foreign key constraint"),
2994 							 errdetail("Table \"%s\" references \"%s\".",
2995 									   relname2, relname),
2996 							 errhint("Truncate table \"%s\" at the same time, "
2997 									 "or use TRUNCATE ... CASCADE.",
2998 									 relname2)));
2999 			}
3000 		}
3001 	}
3002 }
3003 
3004 /*
3005  * heap_truncate_find_FKs
3006  *		Find relations having foreign keys referencing any of the given rels
3007  *
3008  * Input and result are both lists of relation OIDs.  The result contains
3009  * no duplicates, does *not* include any rels that were already in the input
3010  * list, and is sorted in OID order.  (The last property is enforced mainly
3011  * to guarantee consistent behavior in the regression tests; we don't want
3012  * behavior to change depending on chance locations of rows in pg_constraint.)
3013  *
3014  * Note: caller should already have appropriate lock on all rels mentioned
3015  * in relationIds.  Since adding or dropping an FK requires exclusive lock
3016  * on both rels, this ensures that the answer will be stable.
3017  */
3018 List *
heap_truncate_find_FKs(List * relationIds)3019 heap_truncate_find_FKs(List *relationIds)
3020 {
3021 	List	   *result = NIL;
3022 	Relation	fkeyRel;
3023 	SysScanDesc fkeyScan;
3024 	HeapTuple	tuple;
3025 
3026 	/*
3027 	 * Must scan pg_constraint.  Right now, it is a seqscan because there is
3028 	 * no available index on confrelid.
3029 	 */
3030 	fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
3031 
3032 	fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
3033 								  NULL, 0, NULL);
3034 
3035 	while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
3036 	{
3037 		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3038 
3039 		/* Not a foreign key */
3040 		if (con->contype != CONSTRAINT_FOREIGN)
3041 			continue;
3042 
3043 		/* Not referencing one of our list of tables */
3044 		if (!list_member_oid(relationIds, con->confrelid))
3045 			continue;
3046 
3047 		/* Add referencer unless already in input or result list */
3048 		if (!list_member_oid(relationIds, con->conrelid))
3049 			result = insert_ordered_unique_oid(result, con->conrelid);
3050 	}
3051 
3052 	systable_endscan(fkeyScan);
3053 	heap_close(fkeyRel, AccessShareLock);
3054 
3055 	return result;
3056 }
3057 
3058 /*
3059  * insert_ordered_unique_oid
3060  *		Insert a new Oid into a sorted list of Oids, preserving ordering,
3061  *		and eliminating duplicates
3062  *
3063  * Building the ordered list this way is O(N^2), but with a pretty small
3064  * constant, so for the number of entries we expect it will probably be
3065  * faster than trying to apply qsort().  It seems unlikely someone would be
3066  * trying to truncate a table with thousands of dependent tables ...
3067  */
3068 static List *
insert_ordered_unique_oid(List * list,Oid datum)3069 insert_ordered_unique_oid(List *list, Oid datum)
3070 {
3071 	ListCell   *prev;
3072 
3073 	/* Does the datum belong at the front? */
3074 	if (list == NIL || datum < linitial_oid(list))
3075 		return lcons_oid(datum, list);
3076 	/* Does it match the first entry? */
3077 	if (datum == linitial_oid(list))
3078 		return list;			/* duplicate, so don't insert */
3079 	/* No, so find the entry it belongs after */
3080 	prev = list_head(list);
3081 	for (;;)
3082 	{
3083 		ListCell   *curr = lnext(prev);
3084 
3085 		if (curr == NULL || datum < lfirst_oid(curr))
3086 			break;				/* it belongs after 'prev', before 'curr' */
3087 
3088 		if (datum == lfirst_oid(curr))
3089 			return list;		/* duplicate, so don't insert */
3090 
3091 		prev = curr;
3092 	}
3093 	/* Insert datum into list after 'prev' */
3094 	lappend_cell_oid(list, prev, datum);
3095 	return list;
3096 }
3097 
3098 /*
3099  * StorePartitionKey
3100  *		Store information about the partition key rel into the catalog
3101  */
3102 void
StorePartitionKey(Relation rel,char strategy,int16 partnatts,AttrNumber * partattrs,List * partexprs,Oid * partopclass,Oid * partcollation)3103 StorePartitionKey(Relation rel,
3104 				  char strategy,
3105 				  int16 partnatts,
3106 				  AttrNumber *partattrs,
3107 				  List *partexprs,
3108 				  Oid *partopclass,
3109 				  Oid *partcollation)
3110 {
3111 	int			i;
3112 	int2vector *partattrs_vec;
3113 	oidvector  *partopclass_vec;
3114 	oidvector  *partcollation_vec;
3115 	Datum		partexprDatum;
3116 	Relation	pg_partitioned_table;
3117 	HeapTuple	tuple;
3118 	Datum		values[Natts_pg_partitioned_table];
3119 	bool		nulls[Natts_pg_partitioned_table];
3120 	ObjectAddress myself;
3121 	ObjectAddress referenced;
3122 
3123 	Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
3124 
3125 	/* Copy the partition attribute numbers, opclass OIDs into arrays */
3126 	partattrs_vec = buildint2vector(partattrs, partnatts);
3127 	partopclass_vec = buildoidvector(partopclass, partnatts);
3128 	partcollation_vec = buildoidvector(partcollation, partnatts);
3129 
3130 	/* Convert the expressions (if any) to a text datum */
3131 	if (partexprs)
3132 	{
3133 		char	   *exprString;
3134 
3135 		exprString = nodeToString(partexprs);
3136 		partexprDatum = CStringGetTextDatum(exprString);
3137 		pfree(exprString);
3138 	}
3139 	else
3140 		partexprDatum = (Datum) 0;
3141 
3142 	pg_partitioned_table = heap_open(PartitionedRelationId, RowExclusiveLock);
3143 
3144 	MemSet(nulls, false, sizeof(nulls));
3145 
3146 	/* Only this can ever be NULL */
3147 	if (!partexprDatum)
3148 		nulls[Anum_pg_partitioned_table_partexprs - 1] = true;
3149 
3150 	values[Anum_pg_partitioned_table_partrelid - 1] = ObjectIdGetDatum(RelationGetRelid(rel));
3151 	values[Anum_pg_partitioned_table_partstrat - 1] = CharGetDatum(strategy);
3152 	values[Anum_pg_partitioned_table_partnatts - 1] = Int16GetDatum(partnatts);
3153 	values[Anum_pg_partitioned_table_partattrs - 1] = PointerGetDatum(partattrs_vec);
3154 	values[Anum_pg_partitioned_table_partclass - 1] = PointerGetDatum(partopclass_vec);
3155 	values[Anum_pg_partitioned_table_partcollation - 1] = PointerGetDatum(partcollation_vec);
3156 	values[Anum_pg_partitioned_table_partexprs - 1] = partexprDatum;
3157 
3158 	tuple = heap_form_tuple(RelationGetDescr(pg_partitioned_table), values, nulls);
3159 
3160 	CatalogTupleInsert(pg_partitioned_table, tuple);
3161 	heap_close(pg_partitioned_table, RowExclusiveLock);
3162 
3163 	/* Mark this relation as dependent on a few things as follows */
3164 	myself.classId = RelationRelationId;
3165 	myself.objectId = RelationGetRelid(rel);
3166 	myself.objectSubId = 0;
3167 
3168 	/* Operator class and collation per key column */
3169 	for (i = 0; i < partnatts; i++)
3170 	{
3171 		referenced.classId = OperatorClassRelationId;
3172 		referenced.objectId = partopclass[i];
3173 		referenced.objectSubId = 0;
3174 
3175 		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3176 
3177 		/* The default collation is pinned, so don't bother recording it */
3178 		if (OidIsValid(partcollation[i]) &&
3179 			partcollation[i] != DEFAULT_COLLATION_OID)
3180 		{
3181 			referenced.classId = CollationRelationId;
3182 			referenced.objectId = partcollation[i];
3183 			referenced.objectSubId = 0;
3184 
3185 			recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3186 		}
3187 	}
3188 
3189 	/*
3190 	 * The partitioning columns are made internally dependent on the table,
3191 	 * because we cannot drop any of them without dropping the whole table.
3192 	 * (ATExecDropColumn independently enforces that, but it's not bulletproof
3193 	 * so we need the dependencies too.)
3194 	 */
3195 	for (i = 0; i < partnatts; i++)
3196 	{
3197 		if (partattrs[i] == 0)
3198 			continue;			/* ignore expressions here */
3199 
3200 		referenced.classId = RelationRelationId;
3201 		referenced.objectId = RelationGetRelid(rel);
3202 		referenced.objectSubId = partattrs[i];
3203 
3204 		recordDependencyOn(&referenced, &myself, DEPENDENCY_INTERNAL);
3205 	}
3206 
3207 	/*
3208 	 * Also consider anything mentioned in partition expressions.  External
3209 	 * references (e.g. functions) get NORMAL dependencies.  Table columns
3210 	 * mentioned in the expressions are handled the same as plain partitioning
3211 	 * columns, i.e. they become internally dependent on the whole table.
3212 	 */
3213 	if (partexprs)
3214 		recordDependencyOnSingleRelExpr(&myself,
3215 										(Node *) partexprs,
3216 										RelationGetRelid(rel),
3217 										DEPENDENCY_NORMAL,
3218 										DEPENDENCY_INTERNAL,
3219 										true /* reverse the self-deps */ );
3220 
3221 	/*
3222 	 * We must invalidate the relcache so that the next
3223 	 * CommandCounterIncrement() will cause the same to be rebuilt using the
3224 	 * information in just created catalog entry.
3225 	 */
3226 	CacheInvalidateRelcache(rel);
3227 }
3228 
3229 /*
3230  *	RemovePartitionKeyByRelId
3231  *		Remove pg_partitioned_table entry for a relation
3232  */
3233 void
RemovePartitionKeyByRelId(Oid relid)3234 RemovePartitionKeyByRelId(Oid relid)
3235 {
3236 	Relation	rel;
3237 	HeapTuple	tuple;
3238 
3239 	rel = heap_open(PartitionedRelationId, RowExclusiveLock);
3240 
3241 	tuple = SearchSysCache1(PARTRELID, ObjectIdGetDatum(relid));
3242 	if (!HeapTupleIsValid(tuple))
3243 		elog(ERROR, "cache lookup failed for partition key of relation %u",
3244 			 relid);
3245 
3246 	CatalogTupleDelete(rel, &tuple->t_self);
3247 
3248 	ReleaseSysCache(tuple);
3249 	heap_close(rel, RowExclusiveLock);
3250 }
3251 
3252 /*
3253  * StorePartitionBound
3254  *		Update pg_class tuple of rel to store the partition bound and set
3255  *		relispartition to true
3256  *
3257  * Also, invalidate the parent's relcache, so that the next rebuild will load
3258  * the new partition's info into its partition descriptor.
3259  */
3260 void
StorePartitionBound(Relation rel,Relation parent,PartitionBoundSpec * bound)3261 StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
3262 {
3263 	Relation	classRel;
3264 	HeapTuple	tuple,
3265 				newtuple;
3266 	Datum		new_val[Natts_pg_class];
3267 	bool		new_null[Natts_pg_class],
3268 				new_repl[Natts_pg_class];
3269 
3270 	/* Update pg_class tuple */
3271 	classRel = heap_open(RelationRelationId, RowExclusiveLock);
3272 	tuple = SearchSysCacheCopy1(RELOID,
3273 								ObjectIdGetDatum(RelationGetRelid(rel)));
3274 	if (!HeapTupleIsValid(tuple))
3275 		elog(ERROR, "cache lookup failed for relation %u",
3276 			 RelationGetRelid(rel));
3277 
3278 #ifdef USE_ASSERT_CHECKING
3279 	{
3280 		Form_pg_class classForm;
3281 		bool		isnull;
3282 
3283 		classForm = (Form_pg_class) GETSTRUCT(tuple);
3284 		Assert(!classForm->relispartition);
3285 		(void) SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relpartbound,
3286 							   &isnull);
3287 		Assert(isnull);
3288 	}
3289 #endif
3290 
3291 	/* Fill in relpartbound value */
3292 	memset(new_val, 0, sizeof(new_val));
3293 	memset(new_null, false, sizeof(new_null));
3294 	memset(new_repl, false, sizeof(new_repl));
3295 	new_val[Anum_pg_class_relpartbound - 1] = CStringGetTextDatum(nodeToString(bound));
3296 	new_null[Anum_pg_class_relpartbound - 1] = false;
3297 	new_repl[Anum_pg_class_relpartbound - 1] = true;
3298 	newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
3299 								 new_val, new_null, new_repl);
3300 	/* Also set the flag */
3301 	((Form_pg_class) GETSTRUCT(newtuple))->relispartition = true;
3302 	CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
3303 	heap_freetuple(newtuple);
3304 	heap_close(classRel, RowExclusiveLock);
3305 
3306 	CacheInvalidateRelcache(parent);
3307 }
3308