1 /*-------------------------------------------------------------------------
2 *
3 * heap.c
4 * code to create and destroy POSTGRES heap relations
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/catalog/heap.c
12 *
13 *
14 * INTERFACE ROUTINES
15 * heap_create() - Create an uncataloged heap relation
16 * heap_create_with_catalog() - Create a cataloged relation
17 * heap_drop_with_catalog() - Removes named relation from catalogs
18 *
19 * NOTES
20 * this code taken from access/heap/create.c, which contains
21 * the old heap_create_with_catalog, amcreate, and amdestroy.
22 * those routines will soon call these routines using the function
23 * manager,
24 * just like the poorly named "NewXXX" routines do. The
25 * "New" routines are all going to die soon, once and for all!
26 * -cim 1/13/91
27 *
28 *-------------------------------------------------------------------------
29 */
30 #include "postgres.h"
31
32 #include "access/genam.h"
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/relation.h"
36 #include "access/sysattr.h"
37 #include "access/table.h"
38 #include "access/tableam.h"
39 #include "access/transam.h"
40 #include "access/xact.h"
41 #include "access/xlog.h"
42 #include "catalog/binary_upgrade.h"
43 #include "catalog/catalog.h"
44 #include "catalog/dependency.h"
45 #include "catalog/heap.h"
46 #include "catalog/index.h"
47 #include "catalog/objectaccess.h"
48 #include "catalog/partition.h"
49 #include "catalog/pg_am.h"
50 #include "catalog/pg_attrdef.h"
51 #include "catalog/pg_collation.h"
52 #include "catalog/pg_constraint.h"
53 #include "catalog/pg_foreign_table.h"
54 #include "catalog/pg_inherits.h"
55 #include "catalog/pg_namespace.h"
56 #include "catalog/pg_opclass.h"
57 #include "catalog/pg_partitioned_table.h"
58 #include "catalog/pg_statistic.h"
59 #include "catalog/pg_subscription_rel.h"
60 #include "catalog/pg_tablespace.h"
61 #include "catalog/pg_type.h"
62 #include "catalog/storage.h"
63 #include "catalog/storage_xlog.h"
64 #include "commands/tablecmds.h"
65 #include "commands/typecmds.h"
66 #include "executor/executor.h"
67 #include "miscadmin.h"
68 #include "nodes/nodeFuncs.h"
69 #include "optimizer/optimizer.h"
70 #include "parser/parse_coerce.h"
71 #include "parser/parse_collate.h"
72 #include "parser/parse_expr.h"
73 #include "parser/parse_relation.h"
74 #include "parser/parsetree.h"
75 #include "partitioning/partdesc.h"
76 #include "storage/lmgr.h"
77 #include "storage/predicate.h"
78 #include "storage/smgr.h"
79 #include "utils/acl.h"
80 #include "utils/builtins.h"
81 #include "utils/datum.h"
82 #include "utils/fmgroids.h"
83 #include "utils/inval.h"
84 #include "utils/lsyscache.h"
85 #include "utils/partcache.h"
86 #include "utils/ruleutils.h"
87 #include "utils/snapmgr.h"
88 #include "utils/syscache.h"
89
90
91 /* Potentially set by pg_upgrade_support functions */
92 Oid binary_upgrade_next_heap_pg_class_oid = InvalidOid;
93 Oid binary_upgrade_next_toast_pg_class_oid = InvalidOid;
94
95 static void AddNewRelationTuple(Relation pg_class_desc,
96 Relation new_rel_desc,
97 Oid new_rel_oid,
98 Oid new_type_oid,
99 Oid reloftype,
100 Oid relowner,
101 char relkind,
102 TransactionId relfrozenxid,
103 TransactionId relminmxid,
104 Datum relacl,
105 Datum reloptions);
106 static ObjectAddress AddNewRelationType(const char *typeName,
107 Oid typeNamespace,
108 Oid new_rel_oid,
109 char new_rel_kind,
110 Oid ownerid,
111 Oid new_row_type,
112 Oid new_array_type);
113 static void RelationRemoveInheritance(Oid relid);
114 static Oid StoreRelCheck(Relation rel, const char *ccname, Node *expr,
115 bool is_validated, bool is_local, int inhcount,
116 bool is_no_inherit, bool is_internal);
117 static void StoreConstraints(Relation rel, List *cooked_constraints,
118 bool is_internal);
119 static bool MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
120 bool allow_merge, bool is_local,
121 bool is_initially_valid,
122 bool is_no_inherit);
123 static void SetRelationNumChecks(Relation rel, int numchecks);
124 static Node *cookConstraint(ParseState *pstate,
125 Node *raw_constraint,
126 char *relname);
127
128
129 /* ----------------------------------------------------------------
130 * XXX UGLY HARD CODED BADNESS FOLLOWS XXX
131 *
132 * these should all be moved to someplace in the lib/catalog
133 * module, if not obliterated first.
134 * ----------------------------------------------------------------
135 */
136
137
138 /*
139 * Note:
140 * Should the system special case these attributes in the future?
141 * Advantage: consume much less space in the ATTRIBUTE relation.
142 * Disadvantage: special cases will be all over the place.
143 */
144
145 /*
146 * The initializers below do not include trailing variable length fields,
147 * but that's OK - we're never going to reference anything beyond the
148 * fixed-size portion of the structure anyway.
149 */
150
151 static const FormData_pg_attribute a1 = {
152 .attname = {"ctid"},
153 .atttypid = TIDOID,
154 .attlen = sizeof(ItemPointerData),
155 .attnum = SelfItemPointerAttributeNumber,
156 .attcacheoff = -1,
157 .atttypmod = -1,
158 .attbyval = false,
159 .attstorage = TYPSTORAGE_PLAIN,
160 .attalign = TYPALIGN_SHORT,
161 .attnotnull = true,
162 .attislocal = true,
163 };
164
165 static const FormData_pg_attribute a2 = {
166 .attname = {"xmin"},
167 .atttypid = XIDOID,
168 .attlen = sizeof(TransactionId),
169 .attnum = MinTransactionIdAttributeNumber,
170 .attcacheoff = -1,
171 .atttypmod = -1,
172 .attbyval = true,
173 .attstorage = TYPSTORAGE_PLAIN,
174 .attalign = TYPALIGN_INT,
175 .attnotnull = true,
176 .attislocal = true,
177 };
178
179 static const FormData_pg_attribute a3 = {
180 .attname = {"cmin"},
181 .atttypid = CIDOID,
182 .attlen = sizeof(CommandId),
183 .attnum = MinCommandIdAttributeNumber,
184 .attcacheoff = -1,
185 .atttypmod = -1,
186 .attbyval = true,
187 .attstorage = TYPSTORAGE_PLAIN,
188 .attalign = TYPALIGN_INT,
189 .attnotnull = true,
190 .attislocal = true,
191 };
192
193 static const FormData_pg_attribute a4 = {
194 .attname = {"xmax"},
195 .atttypid = XIDOID,
196 .attlen = sizeof(TransactionId),
197 .attnum = MaxTransactionIdAttributeNumber,
198 .attcacheoff = -1,
199 .atttypmod = -1,
200 .attbyval = true,
201 .attstorage = TYPSTORAGE_PLAIN,
202 .attalign = TYPALIGN_INT,
203 .attnotnull = true,
204 .attislocal = true,
205 };
206
207 static const FormData_pg_attribute a5 = {
208 .attname = {"cmax"},
209 .atttypid = CIDOID,
210 .attlen = sizeof(CommandId),
211 .attnum = MaxCommandIdAttributeNumber,
212 .attcacheoff = -1,
213 .atttypmod = -1,
214 .attbyval = true,
215 .attstorage = TYPSTORAGE_PLAIN,
216 .attalign = TYPALIGN_INT,
217 .attnotnull = true,
218 .attislocal = true,
219 };
220
221 /*
222 * We decided to call this attribute "tableoid" rather than say
223 * "classoid" on the basis that in the future there may be more than one
224 * table of a particular class/type. In any case table is still the word
225 * used in SQL.
226 */
227 static const FormData_pg_attribute a6 = {
228 .attname = {"tableoid"},
229 .atttypid = OIDOID,
230 .attlen = sizeof(Oid),
231 .attnum = TableOidAttributeNumber,
232 .attcacheoff = -1,
233 .atttypmod = -1,
234 .attbyval = true,
235 .attstorage = TYPSTORAGE_PLAIN,
236 .attalign = TYPALIGN_INT,
237 .attnotnull = true,
238 .attislocal = true,
239 };
240
241 static const FormData_pg_attribute *SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6};
242
243 /*
244 * This function returns a Form_pg_attribute pointer for a system attribute.
245 * Note that we elog if the presented attno is invalid, which would only
246 * happen if there's a problem upstream.
247 */
248 const FormData_pg_attribute *
SystemAttributeDefinition(AttrNumber attno)249 SystemAttributeDefinition(AttrNumber attno)
250 {
251 if (attno >= 0 || attno < -(int) lengthof(SysAtt))
252 elog(ERROR, "invalid system attribute number %d", attno);
253 return SysAtt[-attno - 1];
254 }
255
256 /*
257 * If the given name is a system attribute name, return a Form_pg_attribute
258 * pointer for a prototype definition. If not, return NULL.
259 */
260 const FormData_pg_attribute *
SystemAttributeByName(const char * attname)261 SystemAttributeByName(const char *attname)
262 {
263 int j;
264
265 for (j = 0; j < (int) lengthof(SysAtt); j++)
266 {
267 const FormData_pg_attribute *att = SysAtt[j];
268
269 if (strcmp(NameStr(att->attname), attname) == 0)
270 return att;
271 }
272
273 return NULL;
274 }
275
276
277 /* ----------------------------------------------------------------
278 * XXX END OF UGLY HARD CODED BADNESS XXX
279 * ---------------------------------------------------------------- */
280
281
282 /* ----------------------------------------------------------------
283 * heap_create - Create an uncataloged heap relation
284 *
285 * Note API change: the caller must now always provide the OID
286 * to use for the relation. The relfilenode may (and, normally,
287 * should) be left unspecified.
288 *
289 * rel->rd_rel is initialized by RelationBuildLocalRelation,
290 * and is mostly zeroes at return.
291 * ----------------------------------------------------------------
292 */
293 Relation
heap_create(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid relfilenode,Oid accessmtd,TupleDesc tupDesc,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,bool allow_system_table_mods,TransactionId * relfrozenxid,MultiXactId * relminmxid)294 heap_create(const char *relname,
295 Oid relnamespace,
296 Oid reltablespace,
297 Oid relid,
298 Oid relfilenode,
299 Oid accessmtd,
300 TupleDesc tupDesc,
301 char relkind,
302 char relpersistence,
303 bool shared_relation,
304 bool mapped_relation,
305 bool allow_system_table_mods,
306 TransactionId *relfrozenxid,
307 MultiXactId *relminmxid)
308 {
309 bool create_storage;
310 Relation rel;
311
312 /* The caller must have provided an OID for the relation. */
313 Assert(OidIsValid(relid));
314
315 /*
316 * Don't allow creating relations in pg_catalog directly, even though it
317 * is allowed to move user defined relations there. Semantics with search
318 * paths including pg_catalog are too confusing for now.
319 *
320 * But allow creating indexes on relations in pg_catalog even if
321 * allow_system_table_mods = off, upper layers already guarantee it's on a
322 * user defined relation, not a system one.
323 */
324 if (!allow_system_table_mods &&
325 ((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
326 IsToastNamespace(relnamespace)) &&
327 IsNormalProcessingMode())
328 ereport(ERROR,
329 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
330 errmsg("permission denied to create \"%s.%s\"",
331 get_namespace_name(relnamespace), relname),
332 errdetail("System catalog modifications are currently disallowed.")));
333
334 *relfrozenxid = InvalidTransactionId;
335 *relminmxid = InvalidMultiXactId;
336
337 /* Handle reltablespace for specific relkinds. */
338 switch (relkind)
339 {
340 case RELKIND_VIEW:
341 case RELKIND_COMPOSITE_TYPE:
342 case RELKIND_FOREIGN_TABLE:
343
344 /*
345 * Force reltablespace to zero if the relation has no physical
346 * storage. This is mainly just for cleanliness' sake.
347 *
348 * Partitioned tables and indexes don't have physical storage
349 * either, but we want to keep their tablespace settings so that
350 * their children can inherit it.
351 */
352 reltablespace = InvalidOid;
353 break;
354
355 case RELKIND_SEQUENCE:
356
357 /*
358 * Force reltablespace to zero for sequences, since we don't
359 * support moving them around into different tablespaces.
360 */
361 reltablespace = InvalidOid;
362 break;
363 default:
364 break;
365 }
366
367 /*
368 * Decide whether to create storage. If caller passed a valid relfilenode,
369 * storage is already created, so don't do it here. Also don't create it
370 * for relkinds without physical storage.
371 */
372 if (!RELKIND_HAS_STORAGE(relkind) || OidIsValid(relfilenode))
373 create_storage = false;
374 else
375 {
376 create_storage = true;
377 relfilenode = relid;
378 }
379
380 /*
381 * Never allow a pg_class entry to explicitly specify the database's
382 * default tablespace in reltablespace; force it to zero instead. This
383 * ensures that if the database is cloned with a different default
384 * tablespace, the pg_class entry will still match where CREATE DATABASE
385 * will put the physically copied relation.
386 *
387 * Yes, this is a bit of a hack.
388 */
389 if (reltablespace == MyDatabaseTableSpace)
390 reltablespace = InvalidOid;
391
392 /*
393 * build the relcache entry.
394 */
395 rel = RelationBuildLocalRelation(relname,
396 relnamespace,
397 tupDesc,
398 relid,
399 accessmtd,
400 relfilenode,
401 reltablespace,
402 shared_relation,
403 mapped_relation,
404 relpersistence,
405 relkind);
406
407 /*
408 * Have the storage manager create the relation's disk file, if needed.
409 *
410 * For relations the callback creates both the main and the init fork, for
411 * indexes only the main fork is created. The other forks will be created
412 * on demand.
413 */
414 if (create_storage)
415 {
416 RelationOpenSmgr(rel);
417
418 switch (rel->rd_rel->relkind)
419 {
420 case RELKIND_VIEW:
421 case RELKIND_COMPOSITE_TYPE:
422 case RELKIND_FOREIGN_TABLE:
423 case RELKIND_PARTITIONED_TABLE:
424 case RELKIND_PARTITIONED_INDEX:
425 Assert(false);
426 break;
427
428 case RELKIND_INDEX:
429 case RELKIND_SEQUENCE:
430 RelationCreateStorage(rel->rd_node, relpersistence);
431 break;
432
433 case RELKIND_RELATION:
434 case RELKIND_TOASTVALUE:
435 case RELKIND_MATVIEW:
436 table_relation_set_new_filenode(rel, &rel->rd_node,
437 relpersistence,
438 relfrozenxid, relminmxid);
439 break;
440 }
441 }
442
443 /*
444 * If a tablespace is specified, removal of that tablespace is normally
445 * protected by the existence of a physical file; but for relations with
446 * no files, add a pg_shdepend entry to account for that.
447 */
448 if (!create_storage && reltablespace != InvalidOid)
449 recordDependencyOnTablespace(RelationRelationId, relid,
450 reltablespace);
451
452 return rel;
453 }
454
455 /* ----------------------------------------------------------------
456 * heap_create_with_catalog - Create a cataloged relation
457 *
458 * this is done in multiple steps:
459 *
460 * 1) CheckAttributeNamesTypes() is used to make certain the tuple
461 * descriptor contains a valid set of attribute names and types
462 *
463 * 2) pg_class is opened and get_relname_relid()
464 * performs a scan to ensure that no relation with the
465 * same name already exists.
466 *
467 * 3) heap_create() is called to create the new relation on disk.
468 *
469 * 4) TypeCreate() is called to define a new type corresponding
470 * to the new relation.
471 *
472 * 5) AddNewRelationTuple() is called to register the
473 * relation in pg_class.
474 *
475 * 6) AddNewAttributeTuples() is called to register the
476 * new relation's schema in pg_attribute.
477 *
478 * 7) StoreConstraints is called () - vadim 08/22/97
479 *
480 * 8) the relations are closed and the new relation's oid
481 * is returned.
482 *
483 * ----------------------------------------------------------------
484 */
485
486 /* --------------------------------
487 * CheckAttributeNamesTypes
488 *
489 * this is used to make certain the tuple descriptor contains a
490 * valid set of attribute names and datatypes. a problem simply
491 * generates ereport(ERROR) which aborts the current transaction.
492 *
493 * relkind is the relkind of the relation to be created.
494 * flags controls which datatypes are allowed, cf CheckAttributeType.
495 * --------------------------------
496 */
497 void
CheckAttributeNamesTypes(TupleDesc tupdesc,char relkind,int flags)498 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
499 int flags)
500 {
501 int i;
502 int j;
503 int natts = tupdesc->natts;
504
505 /* Sanity check on column count */
506 if (natts < 0 || natts > MaxHeapAttributeNumber)
507 ereport(ERROR,
508 (errcode(ERRCODE_TOO_MANY_COLUMNS),
509 errmsg("tables can have at most %d columns",
510 MaxHeapAttributeNumber)));
511
512 /*
513 * first check for collision with system attribute names
514 *
515 * Skip this for a view or type relation, since those don't have system
516 * attributes.
517 */
518 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
519 {
520 for (i = 0; i < natts; i++)
521 {
522 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
523
524 if (SystemAttributeByName(NameStr(attr->attname)) != NULL)
525 ereport(ERROR,
526 (errcode(ERRCODE_DUPLICATE_COLUMN),
527 errmsg("column name \"%s\" conflicts with a system column name",
528 NameStr(attr->attname))));
529 }
530 }
531
532 /*
533 * next check for repeated attribute names
534 */
535 for (i = 1; i < natts; i++)
536 {
537 for (j = 0; j < i; j++)
538 {
539 if (strcmp(NameStr(TupleDescAttr(tupdesc, j)->attname),
540 NameStr(TupleDescAttr(tupdesc, i)->attname)) == 0)
541 ereport(ERROR,
542 (errcode(ERRCODE_DUPLICATE_COLUMN),
543 errmsg("column name \"%s\" specified more than once",
544 NameStr(TupleDescAttr(tupdesc, j)->attname))));
545 }
546 }
547
548 /*
549 * next check the attribute types
550 */
551 for (i = 0; i < natts; i++)
552 {
553 CheckAttributeType(NameStr(TupleDescAttr(tupdesc, i)->attname),
554 TupleDescAttr(tupdesc, i)->atttypid,
555 TupleDescAttr(tupdesc, i)->attcollation,
556 NIL, /* assume we're creating a new rowtype */
557 flags);
558 }
559 }
560
561 /* --------------------------------
562 * CheckAttributeType
563 *
564 * Verify that the proposed datatype of an attribute is legal.
565 * This is needed mainly because there are types (and pseudo-types)
566 * in the catalogs that we do not support as elements of real tuples.
567 * We also check some other properties required of a table column.
568 *
569 * If the attribute is being proposed for addition to an existing table or
570 * composite type, pass a one-element list of the rowtype OID as
571 * containing_rowtypes. When checking a to-be-created rowtype, it's
572 * sufficient to pass NIL, because there could not be any recursive reference
573 * to a not-yet-existing rowtype.
574 *
575 * flags is a bitmask controlling which datatypes we allow. For the most
576 * part, pseudo-types are disallowed as attribute types, but there are some
577 * exceptions: ANYARRAYOID, RECORDOID, and RECORDARRAYOID can be allowed
578 * in some cases. (This works because values of those type classes are
579 * self-identifying to some extent. However, RECORDOID and RECORDARRAYOID
580 * are reliably identifiable only within a session, since the identity info
581 * may use a typmod that is only locally assigned. The caller is expected
582 * to know whether these cases are safe.)
583 *
584 * flags can also control the phrasing of the error messages. If
585 * CHKATYPE_IS_PARTKEY is specified, "attname" should be a partition key
586 * column number as text, not a real column name.
587 * --------------------------------
588 */
589 void
CheckAttributeType(const char * attname,Oid atttypid,Oid attcollation,List * containing_rowtypes,int flags)590 CheckAttributeType(const char *attname,
591 Oid atttypid, Oid attcollation,
592 List *containing_rowtypes,
593 int flags)
594 {
595 char att_typtype = get_typtype(atttypid);
596 Oid att_typelem;
597
598 if (att_typtype == TYPTYPE_PSEUDO)
599 {
600 /*
601 * We disallow pseudo-type columns, with the exception of ANYARRAY,
602 * RECORD, and RECORD[] when the caller says that those are OK.
603 *
604 * We don't need to worry about recursive containment for RECORD and
605 * RECORD[] because (a) no named composite type should be allowed to
606 * contain those, and (b) two "anonymous" record types couldn't be
607 * considered to be the same type, so infinite recursion isn't
608 * possible.
609 */
610 if (!((atttypid == ANYARRAYOID && (flags & CHKATYPE_ANYARRAY)) ||
611 (atttypid == RECORDOID && (flags & CHKATYPE_ANYRECORD)) ||
612 (atttypid == RECORDARRAYOID && (flags & CHKATYPE_ANYRECORD))))
613 {
614 if (flags & CHKATYPE_IS_PARTKEY)
615 ereport(ERROR,
616 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
617 /* translator: first %s is an integer not a name */
618 errmsg("partition key column %s has pseudo-type %s",
619 attname, format_type_be(atttypid))));
620 else
621 ereport(ERROR,
622 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
623 errmsg("column \"%s\" has pseudo-type %s",
624 attname, format_type_be(atttypid))));
625 }
626 }
627 else if (att_typtype == TYPTYPE_DOMAIN)
628 {
629 /*
630 * If it's a domain, recurse to check its base type.
631 */
632 CheckAttributeType(attname, getBaseType(atttypid), attcollation,
633 containing_rowtypes,
634 flags);
635 }
636 else if (att_typtype == TYPTYPE_COMPOSITE)
637 {
638 /*
639 * For a composite type, recurse into its attributes.
640 */
641 Relation relation;
642 TupleDesc tupdesc;
643 int i;
644
645 /*
646 * Check for self-containment. Eventually we might be able to allow
647 * this (just return without complaint, if so) but it's not clear how
648 * many other places would require anti-recursion defenses before it
649 * would be safe to allow tables to contain their own rowtype.
650 */
651 if (list_member_oid(containing_rowtypes, atttypid))
652 ereport(ERROR,
653 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
654 errmsg("composite type %s cannot be made a member of itself",
655 format_type_be(atttypid))));
656
657 containing_rowtypes = lappend_oid(containing_rowtypes, atttypid);
658
659 relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
660
661 tupdesc = RelationGetDescr(relation);
662
663 for (i = 0; i < tupdesc->natts; i++)
664 {
665 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
666
667 if (attr->attisdropped)
668 continue;
669 CheckAttributeType(NameStr(attr->attname),
670 attr->atttypid, attr->attcollation,
671 containing_rowtypes,
672 flags & ~CHKATYPE_IS_PARTKEY);
673 }
674
675 relation_close(relation, AccessShareLock);
676
677 containing_rowtypes = list_delete_last(containing_rowtypes);
678 }
679 else if (att_typtype == TYPTYPE_RANGE)
680 {
681 /*
682 * If it's a range, recurse to check its subtype.
683 */
684 CheckAttributeType(attname, get_range_subtype(atttypid),
685 get_range_collation(atttypid),
686 containing_rowtypes,
687 flags);
688 }
689 else if (OidIsValid((att_typelem = get_element_type(atttypid))))
690 {
691 /*
692 * Must recurse into array types, too, in case they are composite.
693 */
694 CheckAttributeType(attname, att_typelem, attcollation,
695 containing_rowtypes,
696 flags);
697 }
698
699 /*
700 * This might not be strictly invalid per SQL standard, but it is pretty
701 * useless, and it cannot be dumped, so we must disallow it.
702 */
703 if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
704 {
705 if (flags & CHKATYPE_IS_PARTKEY)
706 ereport(ERROR,
707 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
708 /* translator: first %s is an integer not a name */
709 errmsg("no collation was derived for partition key column %s with collatable type %s",
710 attname, format_type_be(atttypid)),
711 errhint("Use the COLLATE clause to set the collation explicitly.")));
712 else
713 ereport(ERROR,
714 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
715 errmsg("no collation was derived for column \"%s\" with collatable type %s",
716 attname, format_type_be(atttypid)),
717 errhint("Use the COLLATE clause to set the collation explicitly.")));
718 }
719 }
720
721 /*
722 * InsertPgAttributeTuple
723 * Construct and insert a new tuple in pg_attribute.
724 *
725 * Caller has already opened and locked pg_attribute. new_attribute is the
726 * attribute to insert. attcacheoff is always initialized to -1, attacl and
727 * attoptions are always initialized to NULL.
728 *
729 * indstate is the index state for CatalogTupleInsertWithInfo. It can be
730 * passed as NULL, in which case we'll fetch the necessary info. (Don't do
731 * this when inserting multiple attributes, because it's a tad more
732 * expensive.)
733 */
734 void
InsertPgAttributeTuple(Relation pg_attribute_rel,Form_pg_attribute new_attribute,Datum attoptions,CatalogIndexState indstate)735 InsertPgAttributeTuple(Relation pg_attribute_rel,
736 Form_pg_attribute new_attribute,
737 Datum attoptions,
738 CatalogIndexState indstate)
739 {
740 Datum values[Natts_pg_attribute];
741 bool nulls[Natts_pg_attribute];
742 HeapTuple tup;
743
744 /* This is a tad tedious, but way cleaner than what we used to do... */
745 memset(values, 0, sizeof(values));
746 memset(nulls, false, sizeof(nulls));
747
748 values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
749 values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
750 values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
751 values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
752 values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
753 values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
754 values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
755 values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(-1);
756 values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
757 values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
758 values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
759 values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
760 values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
761 values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
762 values[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(new_attribute->atthasmissing);
763 values[Anum_pg_attribute_attidentity - 1] = CharGetDatum(new_attribute->attidentity);
764 values[Anum_pg_attribute_attgenerated - 1] = CharGetDatum(new_attribute->attgenerated);
765 values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
766 values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
767 values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
768 values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
769 values[Anum_pg_attribute_attoptions - 1] = attoptions;
770
771 /* start out with empty permissions and empty options */
772 nulls[Anum_pg_attribute_attacl - 1] = true;
773 nulls[Anum_pg_attribute_attoptions - 1] = attoptions == (Datum) 0;
774 nulls[Anum_pg_attribute_attfdwoptions - 1] = true;
775 nulls[Anum_pg_attribute_attmissingval - 1] = true;
776
777 tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
778
779 /* finally insert the new tuple, update the indexes, and clean up */
780 if (indstate != NULL)
781 CatalogTupleInsertWithInfo(pg_attribute_rel, tup, indstate);
782 else
783 CatalogTupleInsert(pg_attribute_rel, tup);
784
785 heap_freetuple(tup);
786 }
787
788 /* --------------------------------
789 * AddNewAttributeTuples
790 *
791 * this registers the new relation's schema by adding
792 * tuples to pg_attribute.
793 * --------------------------------
794 */
795 static void
AddNewAttributeTuples(Oid new_rel_oid,TupleDesc tupdesc,char relkind)796 AddNewAttributeTuples(Oid new_rel_oid,
797 TupleDesc tupdesc,
798 char relkind)
799 {
800 Form_pg_attribute attr;
801 int i;
802 Relation rel;
803 CatalogIndexState indstate;
804 int natts = tupdesc->natts;
805 ObjectAddress myself,
806 referenced;
807
808 /*
809 * open pg_attribute and its indexes.
810 */
811 rel = table_open(AttributeRelationId, RowExclusiveLock);
812
813 indstate = CatalogOpenIndexes(rel);
814
815 /*
816 * First we add the user attributes. This is also a convenient place to
817 * add dependencies on their datatypes and collations.
818 */
819 for (i = 0; i < natts; i++)
820 {
821 attr = TupleDescAttr(tupdesc, i);
822 /* Fill in the correct relation OID */
823 attr->attrelid = new_rel_oid;
824 /* Make sure this is OK, too */
825 attr->attstattarget = -1;
826
827 InsertPgAttributeTuple(rel, attr, (Datum) 0, indstate);
828
829 /* Add dependency info */
830 myself.classId = RelationRelationId;
831 myself.objectId = new_rel_oid;
832 myself.objectSubId = i + 1;
833 referenced.classId = TypeRelationId;
834 referenced.objectId = attr->atttypid;
835 referenced.objectSubId = 0;
836 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
837
838 /* The default collation is pinned, so don't bother recording it */
839 if (OidIsValid(attr->attcollation) &&
840 attr->attcollation != DEFAULT_COLLATION_OID)
841 {
842 referenced.classId = CollationRelationId;
843 referenced.objectId = attr->attcollation;
844 referenced.objectSubId = 0;
845 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
846 }
847 }
848
849 /*
850 * Next we add the system attributes. Skip OID if rel has no OIDs. Skip
851 * all for a view or type relation. We don't bother with making datatype
852 * dependencies here, since presumably all these types are pinned.
853 */
854 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
855 {
856 for (i = 0; i < (int) lengthof(SysAtt); i++)
857 {
858 FormData_pg_attribute attStruct;
859
860 memcpy(&attStruct, SysAtt[i], sizeof(FormData_pg_attribute));
861
862 /* Fill in the correct relation OID in the copied tuple */
863 attStruct.attrelid = new_rel_oid;
864
865 InsertPgAttributeTuple(rel, &attStruct, (Datum) 0, indstate);
866 }
867 }
868
869 /*
870 * clean up
871 */
872 CatalogCloseIndexes(indstate);
873
874 table_close(rel, RowExclusiveLock);
875 }
876
877 /* --------------------------------
878 * InsertPgClassTuple
879 *
880 * Construct and insert a new tuple in pg_class.
881 *
882 * Caller has already opened and locked pg_class.
883 * Tuple data is taken from new_rel_desc->rd_rel, except for the
884 * variable-width fields which are not present in a cached reldesc.
885 * relacl and reloptions are passed in Datum form (to avoid having
886 * to reference the data types in heap.h). Pass (Datum) 0 to set them
887 * to NULL.
888 * --------------------------------
889 */
890 void
InsertPgClassTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Datum relacl,Datum reloptions)891 InsertPgClassTuple(Relation pg_class_desc,
892 Relation new_rel_desc,
893 Oid new_rel_oid,
894 Datum relacl,
895 Datum reloptions)
896 {
897 Form_pg_class rd_rel = new_rel_desc->rd_rel;
898 Datum values[Natts_pg_class];
899 bool nulls[Natts_pg_class];
900 HeapTuple tup;
901
902 /* This is a tad tedious, but way cleaner than what we used to do... */
903 memset(values, 0, sizeof(values));
904 memset(nulls, false, sizeof(nulls));
905
906 values[Anum_pg_class_oid - 1] = ObjectIdGetDatum(new_rel_oid);
907 values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
908 values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
909 values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
910 values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
911 values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
912 values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
913 values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
914 values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
915 values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
916 values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
917 values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
918 values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
919 values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
920 values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
921 values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
922 values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
923 values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
924 values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
925 values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
926 values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
927 values[Anum_pg_class_relrowsecurity - 1] = BoolGetDatum(rd_rel->relrowsecurity);
928 values[Anum_pg_class_relforcerowsecurity - 1] = BoolGetDatum(rd_rel->relforcerowsecurity);
929 values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
930 values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
931 values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
932 values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
933 values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
934 values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
935 values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
936 if (relacl != (Datum) 0)
937 values[Anum_pg_class_relacl - 1] = relacl;
938 else
939 nulls[Anum_pg_class_relacl - 1] = true;
940 if (reloptions != (Datum) 0)
941 values[Anum_pg_class_reloptions - 1] = reloptions;
942 else
943 nulls[Anum_pg_class_reloptions - 1] = true;
944
945 /* relpartbound is set by updating this tuple, if necessary */
946 nulls[Anum_pg_class_relpartbound - 1] = true;
947
948 tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
949
950 /* finally insert the new tuple, update the indexes, and clean up */
951 CatalogTupleInsert(pg_class_desc, tup);
952
953 heap_freetuple(tup);
954 }
955
956 /* --------------------------------
957 * AddNewRelationTuple
958 *
959 * this registers the new relation in the catalogs by
960 * adding a tuple to pg_class.
961 * --------------------------------
962 */
963 static void
AddNewRelationTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Oid new_type_oid,Oid reloftype,Oid relowner,char relkind,TransactionId relfrozenxid,TransactionId relminmxid,Datum relacl,Datum reloptions)964 AddNewRelationTuple(Relation pg_class_desc,
965 Relation new_rel_desc,
966 Oid new_rel_oid,
967 Oid new_type_oid,
968 Oid reloftype,
969 Oid relowner,
970 char relkind,
971 TransactionId relfrozenxid,
972 TransactionId relminmxid,
973 Datum relacl,
974 Datum reloptions)
975 {
976 Form_pg_class new_rel_reltup;
977
978 /*
979 * first we update some of the information in our uncataloged relation's
980 * relation descriptor.
981 */
982 new_rel_reltup = new_rel_desc->rd_rel;
983
984 switch (relkind)
985 {
986 case RELKIND_RELATION:
987 case RELKIND_MATVIEW:
988 case RELKIND_INDEX:
989 case RELKIND_TOASTVALUE:
990 /* The relation is real, but as yet empty */
991 new_rel_reltup->relpages = 0;
992 new_rel_reltup->reltuples = 0;
993 new_rel_reltup->relallvisible = 0;
994 break;
995 case RELKIND_SEQUENCE:
996 /* Sequences always have a known size */
997 new_rel_reltup->relpages = 1;
998 new_rel_reltup->reltuples = 1;
999 new_rel_reltup->relallvisible = 0;
1000 break;
1001 default:
1002 /* Views, etc, have no disk storage */
1003 new_rel_reltup->relpages = 0;
1004 new_rel_reltup->reltuples = 0;
1005 new_rel_reltup->relallvisible = 0;
1006 break;
1007 }
1008
1009 new_rel_reltup->relfrozenxid = relfrozenxid;
1010 new_rel_reltup->relminmxid = relminmxid;
1011 new_rel_reltup->relowner = relowner;
1012 new_rel_reltup->reltype = new_type_oid;
1013 new_rel_reltup->reloftype = reloftype;
1014
1015 /* relispartition is always set by updating this tuple later */
1016 new_rel_reltup->relispartition = false;
1017
1018 new_rel_desc->rd_att->tdtypeid = new_type_oid;
1019
1020 /* Now build and insert the tuple */
1021 InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
1022 relacl, reloptions);
1023 }
1024
1025
1026 /* --------------------------------
1027 * AddNewRelationType -
1028 *
1029 * define a composite type corresponding to the new relation
1030 * --------------------------------
1031 */
1032 static ObjectAddress
AddNewRelationType(const char * typeName,Oid typeNamespace,Oid new_rel_oid,char new_rel_kind,Oid ownerid,Oid new_row_type,Oid new_array_type)1033 AddNewRelationType(const char *typeName,
1034 Oid typeNamespace,
1035 Oid new_rel_oid,
1036 char new_rel_kind,
1037 Oid ownerid,
1038 Oid new_row_type,
1039 Oid new_array_type)
1040 {
1041 return
1042 TypeCreate(new_row_type, /* optional predetermined OID */
1043 typeName, /* type name */
1044 typeNamespace, /* type namespace */
1045 new_rel_oid, /* relation oid */
1046 new_rel_kind, /* relation kind */
1047 ownerid, /* owner's ID */
1048 -1, /* internal size (varlena) */
1049 TYPTYPE_COMPOSITE, /* type-type (composite) */
1050 TYPCATEGORY_COMPOSITE, /* type-category (ditto) */
1051 false, /* composite types are never preferred */
1052 DEFAULT_TYPDELIM, /* default array delimiter */
1053 F_RECORD_IN, /* input procedure */
1054 F_RECORD_OUT, /* output procedure */
1055 F_RECORD_RECV, /* receive procedure */
1056 F_RECORD_SEND, /* send procedure */
1057 InvalidOid, /* typmodin procedure - none */
1058 InvalidOid, /* typmodout procedure - none */
1059 InvalidOid, /* analyze procedure - default */
1060 InvalidOid, /* array element type - irrelevant */
1061 false, /* this is not an array type */
1062 new_array_type, /* array type if any */
1063 InvalidOid, /* domain base type - irrelevant */
1064 NULL, /* default value - none */
1065 NULL, /* default binary representation */
1066 false, /* passed by reference */
1067 TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1068 TYPSTORAGE_EXTENDED, /* fully TOASTable */
1069 -1, /* typmod */
1070 0, /* array dimensions for typBaseType */
1071 false, /* Type NOT NULL */
1072 InvalidOid); /* rowtypes never have a collation */
1073 }
1074
1075 /* --------------------------------
1076 * heap_create_with_catalog
1077 *
1078 * creates a new cataloged relation. see comments above.
1079 *
1080 * Arguments:
1081 * relname: name to give to new rel
1082 * relnamespace: OID of namespace it goes in
1083 * reltablespace: OID of tablespace it goes in
1084 * relid: OID to assign to new rel, or InvalidOid to select a new OID
1085 * reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
1086 * reloftypeid: if a typed table, OID of underlying type; else InvalidOid
1087 * ownerid: OID of new rel's owner
1088 * tupdesc: tuple descriptor (source of column definitions)
1089 * cooked_constraints: list of precooked check constraints and defaults
1090 * relkind: relkind for new rel
1091 * relpersistence: rel's persistence status (permanent, temp, or unlogged)
1092 * shared_relation: true if it's to be a shared relation
1093 * mapped_relation: true if the relation will use the relfilenode map
1094 * oncommit: ON COMMIT marking (only relevant if it's a temp table)
1095 * reloptions: reloptions in Datum form, or (Datum) 0 if none
1096 * use_user_acl: true if should look for user-defined default permissions;
1097 * if false, relacl is always set NULL
1098 * allow_system_table_mods: true to allow creation in system namespaces
1099 * is_internal: is this a system-generated catalog?
1100 *
1101 * Output parameters:
1102 * typaddress: if not null, gets the object address of the new pg_type entry
1103 *
1104 * Returns the OID of the new relation
1105 * --------------------------------
1106 */
1107 Oid
heap_create_with_catalog(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid reltypeid,Oid reloftypeid,Oid ownerid,Oid accessmtd,TupleDesc tupdesc,List * cooked_constraints,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,OnCommitAction oncommit,Datum reloptions,bool use_user_acl,bool allow_system_table_mods,bool is_internal,Oid relrewrite,ObjectAddress * typaddress)1108 heap_create_with_catalog(const char *relname,
1109 Oid relnamespace,
1110 Oid reltablespace,
1111 Oid relid,
1112 Oid reltypeid,
1113 Oid reloftypeid,
1114 Oid ownerid,
1115 Oid accessmtd,
1116 TupleDesc tupdesc,
1117 List *cooked_constraints,
1118 char relkind,
1119 char relpersistence,
1120 bool shared_relation,
1121 bool mapped_relation,
1122 OnCommitAction oncommit,
1123 Datum reloptions,
1124 bool use_user_acl,
1125 bool allow_system_table_mods,
1126 bool is_internal,
1127 Oid relrewrite,
1128 ObjectAddress *typaddress)
1129 {
1130 Relation pg_class_desc;
1131 Relation new_rel_desc;
1132 Acl *relacl;
1133 Oid existing_relid;
1134 Oid old_type_oid;
1135 Oid new_type_oid;
1136 ObjectAddress new_type_addr;
1137 Oid new_array_oid = InvalidOid;
1138 TransactionId relfrozenxid;
1139 MultiXactId relminmxid;
1140
1141 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1142
1143 /*
1144 * sanity checks
1145 */
1146 Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1147
1148 /*
1149 * Validate proposed tupdesc for the desired relkind. If
1150 * allow_system_table_mods is on, allow ANYARRAY to be used; this is a
1151 * hack to allow creating pg_statistic and cloning it during VACUUM FULL.
1152 */
1153 CheckAttributeNamesTypes(tupdesc, relkind,
1154 allow_system_table_mods ? CHKATYPE_ANYARRAY : 0);
1155
1156 /*
1157 * This would fail later on anyway, if the relation already exists. But
1158 * by catching it here we can emit a nicer error message.
1159 */
1160 existing_relid = get_relname_relid(relname, relnamespace);
1161 if (existing_relid != InvalidOid)
1162 ereport(ERROR,
1163 (errcode(ERRCODE_DUPLICATE_TABLE),
1164 errmsg("relation \"%s\" already exists", relname)));
1165
1166 /*
1167 * Since we are going to create a rowtype as well, also check for
1168 * collision with an existing type name. If there is one and it's an
1169 * autogenerated array, we can rename it out of the way; otherwise we can
1170 * at least give a good error message.
1171 */
1172 old_type_oid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
1173 CStringGetDatum(relname),
1174 ObjectIdGetDatum(relnamespace));
1175 if (OidIsValid(old_type_oid))
1176 {
1177 if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1178 ereport(ERROR,
1179 (errcode(ERRCODE_DUPLICATE_OBJECT),
1180 errmsg("type \"%s\" already exists", relname),
1181 errhint("A relation has an associated type of the same name, "
1182 "so you must use a name that doesn't conflict "
1183 "with any existing type.")));
1184 }
1185
1186 /*
1187 * Shared relations must be in pg_global (last-ditch check)
1188 */
1189 if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1190 elog(ERROR, "shared relations must be placed in pg_global tablespace");
1191
1192 /*
1193 * Allocate an OID for the relation, unless we were told what to use.
1194 *
1195 * The OID will be the relfilenode as well, so make sure it doesn't
1196 * collide with either pg_class OIDs or existing physical files.
1197 */
1198 if (!OidIsValid(relid))
1199 {
1200 /* Use binary-upgrade override for pg_class.oid/relfilenode? */
1201 if (IsBinaryUpgrade &&
1202 (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1203 relkind == RELKIND_VIEW || relkind == RELKIND_MATVIEW ||
1204 relkind == RELKIND_COMPOSITE_TYPE || relkind == RELKIND_FOREIGN_TABLE ||
1205 relkind == RELKIND_PARTITIONED_TABLE))
1206 {
1207 if (!OidIsValid(binary_upgrade_next_heap_pg_class_oid))
1208 ereport(ERROR,
1209 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1210 errmsg("pg_class heap OID value not set when in binary upgrade mode")));
1211
1212 relid = binary_upgrade_next_heap_pg_class_oid;
1213 binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1214 }
1215 /* There might be no TOAST table, so we have to test for it. */
1216 else if (IsBinaryUpgrade &&
1217 OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1218 relkind == RELKIND_TOASTVALUE)
1219 {
1220 relid = binary_upgrade_next_toast_pg_class_oid;
1221 binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1222 }
1223 else
1224 relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1225 relpersistence);
1226 }
1227
1228 /*
1229 * Determine the relation's initial permissions.
1230 */
1231 if (use_user_acl)
1232 {
1233 switch (relkind)
1234 {
1235 case RELKIND_RELATION:
1236 case RELKIND_VIEW:
1237 case RELKIND_MATVIEW:
1238 case RELKIND_FOREIGN_TABLE:
1239 case RELKIND_PARTITIONED_TABLE:
1240 relacl = get_user_default_acl(OBJECT_TABLE, ownerid,
1241 relnamespace);
1242 break;
1243 case RELKIND_SEQUENCE:
1244 relacl = get_user_default_acl(OBJECT_SEQUENCE, ownerid,
1245 relnamespace);
1246 break;
1247 default:
1248 relacl = NULL;
1249 break;
1250 }
1251 }
1252 else
1253 relacl = NULL;
1254
1255 /*
1256 * Create the relcache entry (mostly dummy at this point) and the physical
1257 * disk file. (If we fail further down, it's the smgr's responsibility to
1258 * remove the disk file again.)
1259 */
1260 new_rel_desc = heap_create(relname,
1261 relnamespace,
1262 reltablespace,
1263 relid,
1264 InvalidOid,
1265 accessmtd,
1266 tupdesc,
1267 relkind,
1268 relpersistence,
1269 shared_relation,
1270 mapped_relation,
1271 allow_system_table_mods,
1272 &relfrozenxid,
1273 &relminmxid);
1274
1275 Assert(relid == RelationGetRelid(new_rel_desc));
1276
1277 new_rel_desc->rd_rel->relrewrite = relrewrite;
1278
1279 /*
1280 * Decide whether to create an array type over the relation's rowtype. We
1281 * do not create any array types for system catalogs (ie, those made
1282 * during initdb). We do not create them where the use of a relation as
1283 * such is an implementation detail: toast tables, sequences and indexes.
1284 */
1285 if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
1286 relkind == RELKIND_VIEW ||
1287 relkind == RELKIND_MATVIEW ||
1288 relkind == RELKIND_FOREIGN_TABLE ||
1289 relkind == RELKIND_COMPOSITE_TYPE ||
1290 relkind == RELKIND_PARTITIONED_TABLE))
1291 new_array_oid = AssignTypeArrayOid();
1292
1293 /*
1294 * Since defining a relation also defines a complex type, we add a new
1295 * system type corresponding to the new relation. The OID of the type can
1296 * be preselected by the caller, but if reltypeid is InvalidOid, we'll
1297 * generate a new OID for it.
1298 *
1299 * NOTE: we could get a unique-index failure here, in case someone else is
1300 * creating the same type name in parallel but hadn't committed yet when
1301 * we checked for a duplicate name above.
1302 */
1303 new_type_addr = AddNewRelationType(relname,
1304 relnamespace,
1305 relid,
1306 relkind,
1307 ownerid,
1308 reltypeid,
1309 new_array_oid);
1310 new_type_oid = new_type_addr.objectId;
1311 if (typaddress)
1312 *typaddress = new_type_addr;
1313
1314 /*
1315 * Now make the array type if wanted.
1316 */
1317 if (OidIsValid(new_array_oid))
1318 {
1319 char *relarrayname;
1320
1321 relarrayname = makeArrayTypeName(relname, relnamespace);
1322
1323 TypeCreate(new_array_oid, /* force the type's OID to this */
1324 relarrayname, /* Array type name */
1325 relnamespace, /* Same namespace as parent */
1326 InvalidOid, /* Not composite, no relationOid */
1327 0, /* relkind, also N/A here */
1328 ownerid, /* owner's ID */
1329 -1, /* Internal size (varlena) */
1330 TYPTYPE_BASE, /* Not composite - typelem is */
1331 TYPCATEGORY_ARRAY, /* type-category (array) */
1332 false, /* array types are never preferred */
1333 DEFAULT_TYPDELIM, /* default array delimiter */
1334 F_ARRAY_IN, /* array input proc */
1335 F_ARRAY_OUT, /* array output proc */
1336 F_ARRAY_RECV, /* array recv (bin) proc */
1337 F_ARRAY_SEND, /* array send (bin) proc */
1338 InvalidOid, /* typmodin procedure - none */
1339 InvalidOid, /* typmodout procedure - none */
1340 F_ARRAY_TYPANALYZE, /* array analyze procedure */
1341 new_type_oid, /* array element type - the rowtype */
1342 true, /* yes, this is an array type */
1343 InvalidOid, /* this has no array type */
1344 InvalidOid, /* domain base type - irrelevant */
1345 NULL, /* default value - none */
1346 NULL, /* default binary representation */
1347 false, /* passed by reference */
1348 TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1349 TYPSTORAGE_EXTENDED, /* fully TOASTable */
1350 -1, /* typmod */
1351 0, /* array dimensions for typBaseType */
1352 false, /* Type NOT NULL */
1353 InvalidOid); /* rowtypes never have a collation */
1354
1355 pfree(relarrayname);
1356 }
1357
1358 /*
1359 * now create an entry in pg_class for the relation.
1360 *
1361 * NOTE: we could get a unique-index failure here, in case someone else is
1362 * creating the same relation name in parallel but hadn't committed yet
1363 * when we checked for a duplicate name above.
1364 */
1365 AddNewRelationTuple(pg_class_desc,
1366 new_rel_desc,
1367 relid,
1368 new_type_oid,
1369 reloftypeid,
1370 ownerid,
1371 relkind,
1372 relfrozenxid,
1373 relminmxid,
1374 PointerGetDatum(relacl),
1375 reloptions);
1376
1377 /*
1378 * now add tuples to pg_attribute for the attributes in our new relation.
1379 */
1380 AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind);
1381
1382 /*
1383 * Make a dependency link to force the relation to be deleted if its
1384 * namespace is. Also make a dependency link to its owner, as well as
1385 * dependencies for any roles mentioned in the default ACL.
1386 *
1387 * For composite types, these dependencies are tracked for the pg_type
1388 * entry, so we needn't record them here. Likewise, TOAST tables don't
1389 * need a namespace dependency (they live in a pinned namespace) nor an
1390 * owner dependency (they depend indirectly through the parent table), nor
1391 * should they have any ACL entries. The same applies for extension
1392 * dependencies.
1393 *
1394 * Also, skip this in bootstrap mode, since we don't make dependencies
1395 * while bootstrapping.
1396 */
1397 if (relkind != RELKIND_COMPOSITE_TYPE &&
1398 relkind != RELKIND_TOASTVALUE &&
1399 !IsBootstrapProcessingMode())
1400 {
1401 ObjectAddress myself,
1402 referenced;
1403
1404 myself.classId = RelationRelationId;
1405 myself.objectId = relid;
1406 myself.objectSubId = 0;
1407
1408 referenced.classId = NamespaceRelationId;
1409 referenced.objectId = relnamespace;
1410 referenced.objectSubId = 0;
1411 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1412
1413 recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1414
1415 recordDependencyOnNewAcl(RelationRelationId, relid, 0, ownerid, relacl);
1416
1417 recordDependencyOnCurrentExtension(&myself, false);
1418
1419 if (reloftypeid)
1420 {
1421 referenced.classId = TypeRelationId;
1422 referenced.objectId = reloftypeid;
1423 referenced.objectSubId = 0;
1424 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1425 }
1426
1427 /*
1428 * Make a dependency link to force the relation to be deleted if its
1429 * access method is. Do this only for relation and materialized views.
1430 *
1431 * No need to add an explicit dependency for the toast table, as the
1432 * main table depends on it.
1433 */
1434 if (relkind == RELKIND_RELATION ||
1435 relkind == RELKIND_MATVIEW)
1436 {
1437 referenced.classId = AccessMethodRelationId;
1438 referenced.objectId = accessmtd;
1439 referenced.objectSubId = 0;
1440 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1441 }
1442 }
1443
1444 /* Post creation hook for new relation */
1445 InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
1446
1447 /*
1448 * Store any supplied constraints and defaults.
1449 *
1450 * NB: this may do a CommandCounterIncrement and rebuild the relcache
1451 * entry, so the relation must be valid and self-consistent at this point.
1452 * In particular, there are not yet constraints and defaults anywhere.
1453 */
1454 StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
1455
1456 /*
1457 * If there's a special on-commit action, remember it
1458 */
1459 if (oncommit != ONCOMMIT_NOOP)
1460 register_on_commit_action(relid, oncommit);
1461
1462 /*
1463 * ok, the relation has been cataloged, so close our relations and return
1464 * the OID of the newly created relation.
1465 */
1466 table_close(new_rel_desc, NoLock); /* do not unlock till end of xact */
1467 table_close(pg_class_desc, RowExclusiveLock);
1468
1469 return relid;
1470 }
1471
1472 /*
1473 * RelationRemoveInheritance
1474 *
1475 * Formerly, this routine checked for child relations and aborted the
1476 * deletion if any were found. Now we rely on the dependency mechanism
1477 * to check for or delete child relations. By the time we get here,
1478 * there are no children and we need only remove any pg_inherits rows
1479 * linking this relation to its parent(s).
1480 */
1481 static void
RelationRemoveInheritance(Oid relid)1482 RelationRemoveInheritance(Oid relid)
1483 {
1484 Relation catalogRelation;
1485 SysScanDesc scan;
1486 ScanKeyData key;
1487 HeapTuple tuple;
1488
1489 catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
1490
1491 ScanKeyInit(&key,
1492 Anum_pg_inherits_inhrelid,
1493 BTEqualStrategyNumber, F_OIDEQ,
1494 ObjectIdGetDatum(relid));
1495
1496 scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1497 NULL, 1, &key);
1498
1499 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1500 CatalogTupleDelete(catalogRelation, &tuple->t_self);
1501
1502 systable_endscan(scan);
1503 table_close(catalogRelation, RowExclusiveLock);
1504 }
1505
1506 /*
1507 * DeleteRelationTuple
1508 *
1509 * Remove pg_class row for the given relid.
1510 *
1511 * Note: this is shared by relation deletion and index deletion. It's
1512 * not intended for use anyplace else.
1513 */
1514 void
DeleteRelationTuple(Oid relid)1515 DeleteRelationTuple(Oid relid)
1516 {
1517 Relation pg_class_desc;
1518 HeapTuple tup;
1519
1520 /* Grab an appropriate lock on the pg_class relation */
1521 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1522
1523 tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1524 if (!HeapTupleIsValid(tup))
1525 elog(ERROR, "cache lookup failed for relation %u", relid);
1526
1527 /* delete the relation tuple from pg_class, and finish up */
1528 CatalogTupleDelete(pg_class_desc, &tup->t_self);
1529
1530 ReleaseSysCache(tup);
1531
1532 table_close(pg_class_desc, RowExclusiveLock);
1533 }
1534
1535 /*
1536 * DeleteAttributeTuples
1537 *
1538 * Remove pg_attribute rows for the given relid.
1539 *
1540 * Note: this is shared by relation deletion and index deletion. It's
1541 * not intended for use anyplace else.
1542 */
1543 void
DeleteAttributeTuples(Oid relid)1544 DeleteAttributeTuples(Oid relid)
1545 {
1546 Relation attrel;
1547 SysScanDesc scan;
1548 ScanKeyData key[1];
1549 HeapTuple atttup;
1550
1551 /* Grab an appropriate lock on the pg_attribute relation */
1552 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1553
1554 /* Use the index to scan only attributes of the target relation */
1555 ScanKeyInit(&key[0],
1556 Anum_pg_attribute_attrelid,
1557 BTEqualStrategyNumber, F_OIDEQ,
1558 ObjectIdGetDatum(relid));
1559
1560 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1561 NULL, 1, key);
1562
1563 /* Delete all the matching tuples */
1564 while ((atttup = systable_getnext(scan)) != NULL)
1565 CatalogTupleDelete(attrel, &atttup->t_self);
1566
1567 /* Clean up after the scan */
1568 systable_endscan(scan);
1569 table_close(attrel, RowExclusiveLock);
1570 }
1571
1572 /*
1573 * DeleteSystemAttributeTuples
1574 *
1575 * Remove pg_attribute rows for system columns of the given relid.
1576 *
1577 * Note: this is only used when converting a table to a view. Views don't
1578 * have system columns, so we should remove them from pg_attribute.
1579 */
1580 void
DeleteSystemAttributeTuples(Oid relid)1581 DeleteSystemAttributeTuples(Oid relid)
1582 {
1583 Relation attrel;
1584 SysScanDesc scan;
1585 ScanKeyData key[2];
1586 HeapTuple atttup;
1587
1588 /* Grab an appropriate lock on the pg_attribute relation */
1589 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1590
1591 /* Use the index to scan only system attributes of the target relation */
1592 ScanKeyInit(&key[0],
1593 Anum_pg_attribute_attrelid,
1594 BTEqualStrategyNumber, F_OIDEQ,
1595 ObjectIdGetDatum(relid));
1596 ScanKeyInit(&key[1],
1597 Anum_pg_attribute_attnum,
1598 BTLessEqualStrategyNumber, F_INT2LE,
1599 Int16GetDatum(0));
1600
1601 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1602 NULL, 2, key);
1603
1604 /* Delete all the matching tuples */
1605 while ((atttup = systable_getnext(scan)) != NULL)
1606 CatalogTupleDelete(attrel, &atttup->t_self);
1607
1608 /* Clean up after the scan */
1609 systable_endscan(scan);
1610 table_close(attrel, RowExclusiveLock);
1611 }
1612
1613 /*
1614 * RemoveAttributeById
1615 *
1616 * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1617 * deleted in pg_attribute. We also remove pg_statistic entries for it.
1618 * (Everything else needed, such as getting rid of any pg_attrdef entry,
1619 * is handled by dependency.c.)
1620 */
1621 void
RemoveAttributeById(Oid relid,AttrNumber attnum)1622 RemoveAttributeById(Oid relid, AttrNumber attnum)
1623 {
1624 Relation rel;
1625 Relation attr_rel;
1626 HeapTuple tuple;
1627 Form_pg_attribute attStruct;
1628 char newattname[NAMEDATALEN];
1629
1630 /*
1631 * Grab an exclusive lock on the target table, which we will NOT release
1632 * until end of transaction. (In the simple case where we are directly
1633 * dropping this column, ATExecDropColumn already did this ... but when
1634 * cascading from a drop of some other object, we may not have any lock.)
1635 */
1636 rel = relation_open(relid, AccessExclusiveLock);
1637
1638 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1639
1640 tuple = SearchSysCacheCopy2(ATTNUM,
1641 ObjectIdGetDatum(relid),
1642 Int16GetDatum(attnum));
1643 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1644 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1645 attnum, relid);
1646 attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1647
1648 if (attnum < 0)
1649 {
1650 /* System attribute (probably OID) ... just delete the row */
1651
1652 CatalogTupleDelete(attr_rel, &tuple->t_self);
1653 }
1654 else
1655 {
1656 /* Dropping user attributes is lots harder */
1657
1658 /* Mark the attribute as dropped */
1659 attStruct->attisdropped = true;
1660
1661 /*
1662 * Set the type OID to invalid. A dropped attribute's type link
1663 * cannot be relied on (once the attribute is dropped, the type might
1664 * be too). Fortunately we do not need the type row --- the only
1665 * really essential information is the type's typlen and typalign,
1666 * which are preserved in the attribute's attlen and attalign. We set
1667 * atttypid to zero here as a means of catching code that incorrectly
1668 * expects it to be valid.
1669 */
1670 attStruct->atttypid = InvalidOid;
1671
1672 /* Remove any NOT NULL constraint the column may have */
1673 attStruct->attnotnull = false;
1674
1675 /* We don't want to keep stats for it anymore */
1676 attStruct->attstattarget = 0;
1677
1678 /* Unset this so no one tries to look up the generation expression */
1679 attStruct->attgenerated = '\0';
1680
1681 /*
1682 * Change the column name to something that isn't likely to conflict
1683 */
1684 snprintf(newattname, sizeof(newattname),
1685 "........pg.dropped.%d........", attnum);
1686 namestrcpy(&(attStruct->attname), newattname);
1687
1688 /* clear the missing value if any */
1689 if (attStruct->atthasmissing)
1690 {
1691 Datum valuesAtt[Natts_pg_attribute];
1692 bool nullsAtt[Natts_pg_attribute];
1693 bool replacesAtt[Natts_pg_attribute];
1694
1695 /* update the tuple - set atthasmissing and attmissingval */
1696 MemSet(valuesAtt, 0, sizeof(valuesAtt));
1697 MemSet(nullsAtt, false, sizeof(nullsAtt));
1698 MemSet(replacesAtt, false, sizeof(replacesAtt));
1699
1700 valuesAtt[Anum_pg_attribute_atthasmissing - 1] =
1701 BoolGetDatum(false);
1702 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
1703 valuesAtt[Anum_pg_attribute_attmissingval - 1] = (Datum) 0;
1704 nullsAtt[Anum_pg_attribute_attmissingval - 1] = true;
1705 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
1706
1707 tuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
1708 valuesAtt, nullsAtt, replacesAtt);
1709 }
1710
1711 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1712 }
1713
1714 /*
1715 * Because updating the pg_attribute row will trigger a relcache flush for
1716 * the target relation, we need not do anything else to notify other
1717 * backends of the change.
1718 */
1719
1720 table_close(attr_rel, RowExclusiveLock);
1721
1722 if (attnum > 0)
1723 RemoveStatistics(relid, attnum);
1724
1725 relation_close(rel, NoLock);
1726 }
1727
1728 /*
1729 * RemoveAttrDefault
1730 *
1731 * If the specified relation/attribute has a default, remove it.
1732 * (If no default, raise error if complain is true, else return quietly.)
1733 */
1734 void
RemoveAttrDefault(Oid relid,AttrNumber attnum,DropBehavior behavior,bool complain,bool internal)1735 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1736 DropBehavior behavior, bool complain, bool internal)
1737 {
1738 Relation attrdef_rel;
1739 ScanKeyData scankeys[2];
1740 SysScanDesc scan;
1741 HeapTuple tuple;
1742 bool found = false;
1743
1744 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1745
1746 ScanKeyInit(&scankeys[0],
1747 Anum_pg_attrdef_adrelid,
1748 BTEqualStrategyNumber, F_OIDEQ,
1749 ObjectIdGetDatum(relid));
1750 ScanKeyInit(&scankeys[1],
1751 Anum_pg_attrdef_adnum,
1752 BTEqualStrategyNumber, F_INT2EQ,
1753 Int16GetDatum(attnum));
1754
1755 scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1756 NULL, 2, scankeys);
1757
1758 /* There should be at most one matching tuple, but we loop anyway */
1759 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1760 {
1761 ObjectAddress object;
1762 Form_pg_attrdef attrtuple = (Form_pg_attrdef) GETSTRUCT(tuple);
1763
1764 object.classId = AttrDefaultRelationId;
1765 object.objectId = attrtuple->oid;
1766 object.objectSubId = 0;
1767
1768 performDeletion(&object, behavior,
1769 internal ? PERFORM_DELETION_INTERNAL : 0);
1770
1771 found = true;
1772 }
1773
1774 systable_endscan(scan);
1775 table_close(attrdef_rel, RowExclusiveLock);
1776
1777 if (complain && !found)
1778 elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1779 relid, attnum);
1780 }
1781
1782 /*
1783 * RemoveAttrDefaultById
1784 *
1785 * Remove a pg_attrdef entry specified by OID. This is the guts of
1786 * attribute-default removal. Note it should be called via performDeletion,
1787 * not directly.
1788 */
1789 void
RemoveAttrDefaultById(Oid attrdefId)1790 RemoveAttrDefaultById(Oid attrdefId)
1791 {
1792 Relation attrdef_rel;
1793 Relation attr_rel;
1794 Relation myrel;
1795 ScanKeyData scankeys[1];
1796 SysScanDesc scan;
1797 HeapTuple tuple;
1798 Oid myrelid;
1799 AttrNumber myattnum;
1800
1801 /* Grab an appropriate lock on the pg_attrdef relation */
1802 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1803
1804 /* Find the pg_attrdef tuple */
1805 ScanKeyInit(&scankeys[0],
1806 Anum_pg_attrdef_oid,
1807 BTEqualStrategyNumber, F_OIDEQ,
1808 ObjectIdGetDatum(attrdefId));
1809
1810 scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1811 NULL, 1, scankeys);
1812
1813 tuple = systable_getnext(scan);
1814 if (!HeapTupleIsValid(tuple))
1815 elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1816
1817 myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1818 myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1819
1820 /* Get an exclusive lock on the relation owning the attribute */
1821 myrel = relation_open(myrelid, AccessExclusiveLock);
1822
1823 /* Now we can delete the pg_attrdef row */
1824 CatalogTupleDelete(attrdef_rel, &tuple->t_self);
1825
1826 systable_endscan(scan);
1827 table_close(attrdef_rel, RowExclusiveLock);
1828
1829 /* Fix the pg_attribute row */
1830 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1831
1832 tuple = SearchSysCacheCopy2(ATTNUM,
1833 ObjectIdGetDatum(myrelid),
1834 Int16GetDatum(myattnum));
1835 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1836 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1837 myattnum, myrelid);
1838
1839 ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1840
1841 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1842
1843 /*
1844 * Our update of the pg_attribute row will force a relcache rebuild, so
1845 * there's nothing else to do here.
1846 */
1847 table_close(attr_rel, RowExclusiveLock);
1848
1849 /* Keep lock on attribute's rel until end of xact */
1850 relation_close(myrel, NoLock);
1851 }
1852
1853 /*
1854 * heap_drop_with_catalog - removes specified relation from catalogs
1855 *
1856 * Note that this routine is not responsible for dropping objects that are
1857 * linked to the pg_class entry via dependencies (for example, indexes and
1858 * constraints). Those are deleted by the dependency-tracing logic in
1859 * dependency.c before control gets here. In general, therefore, this routine
1860 * should never be called directly; go through performDeletion() instead.
1861 */
1862 void
heap_drop_with_catalog(Oid relid)1863 heap_drop_with_catalog(Oid relid)
1864 {
1865 Relation rel;
1866 HeapTuple tuple;
1867 Oid parentOid = InvalidOid,
1868 defaultPartOid = InvalidOid;
1869
1870 /*
1871 * To drop a partition safely, we must grab exclusive lock on its parent,
1872 * because another backend might be about to execute a query on the parent
1873 * table. If it relies on previously cached partition descriptor, then it
1874 * could attempt to access the just-dropped relation as its partition. We
1875 * must therefore take a table lock strong enough to prevent all queries
1876 * on the table from proceeding until we commit and send out a
1877 * shared-cache-inval notice that will make them update their partition
1878 * descriptors.
1879 */
1880 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1881 if (!HeapTupleIsValid(tuple))
1882 elog(ERROR, "cache lookup failed for relation %u", relid);
1883 if (((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1884 {
1885 parentOid = get_partition_parent(relid);
1886 LockRelationOid(parentOid, AccessExclusiveLock);
1887
1888 /*
1889 * If this is not the default partition, dropping it will change the
1890 * default partition's partition constraint, so we must lock it.
1891 */
1892 defaultPartOid = get_default_partition_oid(parentOid);
1893 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
1894 LockRelationOid(defaultPartOid, AccessExclusiveLock);
1895 }
1896
1897 ReleaseSysCache(tuple);
1898
1899 /*
1900 * Open and lock the relation.
1901 */
1902 rel = relation_open(relid, AccessExclusiveLock);
1903
1904 /*
1905 * There can no longer be anyone *else* touching the relation, but we
1906 * might still have open queries or cursors, or pending trigger events, in
1907 * our own session.
1908 */
1909 CheckTableNotInUse(rel, "DROP TABLE");
1910
1911 /*
1912 * This effectively deletes all rows in the table, and may be done in a
1913 * serializable transaction. In that case we must record a rw-conflict in
1914 * to this transaction from each transaction holding a predicate lock on
1915 * the table.
1916 */
1917 CheckTableForSerializableConflictIn(rel);
1918
1919 /*
1920 * Delete pg_foreign_table tuple first.
1921 */
1922 if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1923 {
1924 Relation rel;
1925 HeapTuple tuple;
1926
1927 rel = table_open(ForeignTableRelationId, RowExclusiveLock);
1928
1929 tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1930 if (!HeapTupleIsValid(tuple))
1931 elog(ERROR, "cache lookup failed for foreign table %u", relid);
1932
1933 CatalogTupleDelete(rel, &tuple->t_self);
1934
1935 ReleaseSysCache(tuple);
1936 table_close(rel, RowExclusiveLock);
1937 }
1938
1939 /*
1940 * If a partitioned table, delete the pg_partitioned_table tuple.
1941 */
1942 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1943 RemovePartitionKeyByRelId(relid);
1944
1945 /*
1946 * If the relation being dropped is the default partition itself,
1947 * invalidate its entry in pg_partitioned_table.
1948 */
1949 if (relid == defaultPartOid)
1950 update_default_partition_oid(parentOid, InvalidOid);
1951
1952 /*
1953 * Schedule unlinking of the relation's physical files at commit.
1954 */
1955 if (rel->rd_rel->relkind != RELKIND_VIEW &&
1956 rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
1957 rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
1958 rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1959 {
1960 RelationDropStorage(rel);
1961 }
1962
1963 /*
1964 * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1965 * until transaction commit. This ensures no one else will try to do
1966 * something with the doomed relation.
1967 */
1968 relation_close(rel, NoLock);
1969
1970 /*
1971 * Remove any associated relation synchronization states.
1972 */
1973 RemoveSubscriptionRel(InvalidOid, relid);
1974
1975 /*
1976 * Forget any ON COMMIT action for the rel
1977 */
1978 remove_on_commit_action(relid);
1979
1980 /*
1981 * Flush the relation from the relcache. We want to do this before
1982 * starting to remove catalog entries, just to be certain that no relcache
1983 * entry rebuild will happen partway through. (That should not really
1984 * matter, since we don't do CommandCounterIncrement here, but let's be
1985 * safe.)
1986 */
1987 RelationForgetRelation(relid);
1988
1989 /*
1990 * remove inheritance information
1991 */
1992 RelationRemoveInheritance(relid);
1993
1994 /*
1995 * delete statistics
1996 */
1997 RemoveStatistics(relid, 0);
1998
1999 /*
2000 * delete attribute tuples
2001 */
2002 DeleteAttributeTuples(relid);
2003
2004 /*
2005 * delete relation tuple
2006 */
2007 DeleteRelationTuple(relid);
2008
2009 if (OidIsValid(parentOid))
2010 {
2011 /*
2012 * If this is not the default partition, the partition constraint of
2013 * the default partition has changed to include the portion of the key
2014 * space previously covered by the dropped partition.
2015 */
2016 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
2017 CacheInvalidateRelcacheByRelid(defaultPartOid);
2018
2019 /*
2020 * Invalidate the parent's relcache so that the partition is no longer
2021 * included in its partition descriptor.
2022 */
2023 CacheInvalidateRelcacheByRelid(parentOid);
2024 /* keep the lock */
2025 }
2026 }
2027
2028
2029 /*
2030 * RelationClearMissing
2031 *
2032 * Set atthasmissing and attmissingval to false/null for all attributes
2033 * where they are currently set. This can be safely and usefully done if
2034 * the table is rewritten (e.g. by VACUUM FULL or CLUSTER) where we know there
2035 * are no rows left with less than a full complement of attributes.
2036 *
2037 * The caller must have an AccessExclusive lock on the relation.
2038 */
2039 void
RelationClearMissing(Relation rel)2040 RelationClearMissing(Relation rel)
2041 {
2042 Relation attr_rel;
2043 Oid relid = RelationGetRelid(rel);
2044 int natts = RelationGetNumberOfAttributes(rel);
2045 int attnum;
2046 Datum repl_val[Natts_pg_attribute];
2047 bool repl_null[Natts_pg_attribute];
2048 bool repl_repl[Natts_pg_attribute];
2049 Form_pg_attribute attrtuple;
2050 HeapTuple tuple,
2051 newtuple;
2052
2053 memset(repl_val, 0, sizeof(repl_val));
2054 memset(repl_null, false, sizeof(repl_null));
2055 memset(repl_repl, false, sizeof(repl_repl));
2056
2057 repl_val[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(false);
2058 repl_null[Anum_pg_attribute_attmissingval - 1] = true;
2059
2060 repl_repl[Anum_pg_attribute_atthasmissing - 1] = true;
2061 repl_repl[Anum_pg_attribute_attmissingval - 1] = true;
2062
2063
2064 /* Get a lock on pg_attribute */
2065 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
2066
2067 /* process each non-system attribute, including any dropped columns */
2068 for (attnum = 1; attnum <= natts; attnum++)
2069 {
2070 tuple = SearchSysCache2(ATTNUM,
2071 ObjectIdGetDatum(relid),
2072 Int16GetDatum(attnum));
2073 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
2074 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2075 attnum, relid);
2076
2077 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
2078
2079 /* ignore any where atthasmissing is not true */
2080 if (attrtuple->atthasmissing)
2081 {
2082 newtuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
2083 repl_val, repl_null, repl_repl);
2084
2085 CatalogTupleUpdate(attr_rel, &newtuple->t_self, newtuple);
2086
2087 heap_freetuple(newtuple);
2088 }
2089
2090 ReleaseSysCache(tuple);
2091 }
2092
2093 /*
2094 * Our update of the pg_attribute rows will force a relcache rebuild, so
2095 * there's nothing else to do here.
2096 */
2097 table_close(attr_rel, RowExclusiveLock);
2098 }
2099
2100 /*
2101 * SetAttrMissing
2102 *
2103 * Set the missing value of a single attribute. This should only be used by
2104 * binary upgrade. Takes an AccessExclusive lock on the relation owning the
2105 * attribute.
2106 */
2107 void
SetAttrMissing(Oid relid,char * attname,char * value)2108 SetAttrMissing(Oid relid, char *attname, char *value)
2109 {
2110 Datum valuesAtt[Natts_pg_attribute];
2111 bool nullsAtt[Natts_pg_attribute];
2112 bool replacesAtt[Natts_pg_attribute];
2113 Datum missingval;
2114 Form_pg_attribute attStruct;
2115 Relation attrrel,
2116 tablerel;
2117 HeapTuple atttup,
2118 newtup;
2119
2120 /* lock the table the attribute belongs to */
2121 tablerel = table_open(relid, AccessExclusiveLock);
2122
2123 /* Don't do anything unless it's a plain table */
2124 if (tablerel->rd_rel->relkind != RELKIND_RELATION)
2125 {
2126 table_close(tablerel, AccessExclusiveLock);
2127 return;
2128 }
2129
2130 /* Lock the attribute row and get the data */
2131 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2132 atttup = SearchSysCacheAttName(relid, attname);
2133 if (!HeapTupleIsValid(atttup))
2134 elog(ERROR, "cache lookup failed for attribute %s of relation %u",
2135 attname, relid);
2136 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2137
2138 /* get an array value from the value string */
2139 missingval = OidFunctionCall3(F_ARRAY_IN,
2140 CStringGetDatum(value),
2141 ObjectIdGetDatum(attStruct->atttypid),
2142 Int32GetDatum(attStruct->atttypmod));
2143
2144 /* update the tuple - set atthasmissing and attmissingval */
2145 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2146 MemSet(nullsAtt, false, sizeof(nullsAtt));
2147 MemSet(replacesAtt, false, sizeof(replacesAtt));
2148
2149 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
2150 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2151 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2152 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2153
2154 newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2155 valuesAtt, nullsAtt, replacesAtt);
2156 CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
2157
2158 /* clean up */
2159 ReleaseSysCache(atttup);
2160 table_close(attrrel, RowExclusiveLock);
2161 table_close(tablerel, AccessExclusiveLock);
2162 }
2163
2164 /*
2165 * Store a default expression for column attnum of relation rel.
2166 *
2167 * Returns the OID of the new pg_attrdef tuple.
2168 *
2169 * add_column_mode must be true if we are storing the default for a new
2170 * attribute, and false if it's for an already existing attribute. The reason
2171 * for this is that the missing value must never be updated after it is set,
2172 * which can only be when a column is added to the table. Otherwise we would
2173 * in effect be changing existing tuples.
2174 */
2175 Oid
StoreAttrDefault(Relation rel,AttrNumber attnum,Node * expr,bool is_internal,bool add_column_mode)2176 StoreAttrDefault(Relation rel, AttrNumber attnum,
2177 Node *expr, bool is_internal, bool add_column_mode)
2178 {
2179 char *adbin;
2180 Relation adrel;
2181 HeapTuple tuple;
2182 Datum values[4];
2183 static bool nulls[4] = {false, false, false, false};
2184 Relation attrrel;
2185 HeapTuple atttup;
2186 Form_pg_attribute attStruct;
2187 char attgenerated;
2188 Oid attrdefOid;
2189 ObjectAddress colobject,
2190 defobject;
2191
2192 adrel = table_open(AttrDefaultRelationId, RowExclusiveLock);
2193
2194 /*
2195 * Flatten expression to string form for storage.
2196 */
2197 adbin = nodeToString(expr);
2198
2199 /*
2200 * Make the pg_attrdef entry.
2201 */
2202 attrdefOid = GetNewOidWithIndex(adrel, AttrDefaultOidIndexId,
2203 Anum_pg_attrdef_oid);
2204 values[Anum_pg_attrdef_oid - 1] = ObjectIdGetDatum(attrdefOid);
2205 values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
2206 values[Anum_pg_attrdef_adnum - 1] = attnum;
2207 values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
2208
2209 tuple = heap_form_tuple(adrel->rd_att, values, nulls);
2210 CatalogTupleInsert(adrel, tuple);
2211
2212 defobject.classId = AttrDefaultRelationId;
2213 defobject.objectId = attrdefOid;
2214 defobject.objectSubId = 0;
2215
2216 table_close(adrel, RowExclusiveLock);
2217
2218 /* now can free some of the stuff allocated above */
2219 pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
2220 heap_freetuple(tuple);
2221 pfree(adbin);
2222
2223 /*
2224 * Update the pg_attribute entry for the column to show that a default
2225 * exists.
2226 */
2227 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2228 atttup = SearchSysCacheCopy2(ATTNUM,
2229 ObjectIdGetDatum(RelationGetRelid(rel)),
2230 Int16GetDatum(attnum));
2231 if (!HeapTupleIsValid(atttup))
2232 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2233 attnum, RelationGetRelid(rel));
2234 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2235 attgenerated = attStruct->attgenerated;
2236 if (!attStruct->atthasdef)
2237 {
2238 Form_pg_attribute defAttStruct;
2239
2240 ExprState *exprState;
2241 Expr *expr2 = (Expr *) expr;
2242 EState *estate = NULL;
2243 ExprContext *econtext;
2244 Datum valuesAtt[Natts_pg_attribute];
2245 bool nullsAtt[Natts_pg_attribute];
2246 bool replacesAtt[Natts_pg_attribute];
2247 Datum missingval = (Datum) 0;
2248 bool missingIsNull = true;
2249
2250 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2251 MemSet(nullsAtt, false, sizeof(nullsAtt));
2252 MemSet(replacesAtt, false, sizeof(replacesAtt));
2253 valuesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2254 replacesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2255
2256 if (rel->rd_rel->relkind == RELKIND_RELATION && add_column_mode &&
2257 !attgenerated)
2258 {
2259 expr2 = expression_planner(expr2);
2260 estate = CreateExecutorState();
2261 exprState = ExecPrepareExpr(expr2, estate);
2262 econtext = GetPerTupleExprContext(estate);
2263
2264 missingval = ExecEvalExpr(exprState, econtext,
2265 &missingIsNull);
2266
2267 FreeExecutorState(estate);
2268
2269 defAttStruct = TupleDescAttr(rel->rd_att, attnum - 1);
2270
2271 if (missingIsNull)
2272 {
2273 /* if the default evaluates to NULL, just store a NULL array */
2274 missingval = (Datum) 0;
2275 }
2276 else
2277 {
2278 /* otherwise make a one-element array of the value */
2279 missingval = PointerGetDatum(construct_array(&missingval,
2280 1,
2281 defAttStruct->atttypid,
2282 defAttStruct->attlen,
2283 defAttStruct->attbyval,
2284 defAttStruct->attalign));
2285 }
2286
2287 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = !missingIsNull;
2288 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2289 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2290 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2291 nullsAtt[Anum_pg_attribute_attmissingval - 1] = missingIsNull;
2292 }
2293 atttup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2294 valuesAtt, nullsAtt, replacesAtt);
2295
2296 CatalogTupleUpdate(attrrel, &atttup->t_self, atttup);
2297
2298 if (!missingIsNull)
2299 pfree(DatumGetPointer(missingval));
2300
2301 }
2302 table_close(attrrel, RowExclusiveLock);
2303 heap_freetuple(atttup);
2304
2305 /*
2306 * Make a dependency so that the pg_attrdef entry goes away if the column
2307 * (or whole table) is deleted.
2308 */
2309 colobject.classId = RelationRelationId;
2310 colobject.objectId = RelationGetRelid(rel);
2311 colobject.objectSubId = attnum;
2312
2313 recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
2314
2315 /*
2316 * Record dependencies on objects used in the expression, too.
2317 */
2318 if (attgenerated)
2319 {
2320 /*
2321 * Generated column: Dropping anything that the generation expression
2322 * refers to automatically drops the generated column.
2323 */
2324 recordDependencyOnSingleRelExpr(&colobject, expr, RelationGetRelid(rel),
2325 DEPENDENCY_AUTO,
2326 DEPENDENCY_AUTO, false);
2327 }
2328 else
2329 {
2330 /*
2331 * Normal default: Dropping anything that the default refers to
2332 * requires CASCADE and drops the default only.
2333 */
2334 recordDependencyOnSingleRelExpr(&defobject, expr, RelationGetRelid(rel),
2335 DEPENDENCY_NORMAL,
2336 DEPENDENCY_NORMAL, false);
2337 }
2338
2339 /*
2340 * Post creation hook for attribute defaults.
2341 *
2342 * XXX. ALTER TABLE ALTER COLUMN SET/DROP DEFAULT is implemented with a
2343 * couple of deletion/creation of the attribute's default entry, so the
2344 * callee should check existence of an older version of this entry if it
2345 * needs to distinguish.
2346 */
2347 InvokeObjectPostCreateHookArg(AttrDefaultRelationId,
2348 RelationGetRelid(rel), attnum, is_internal);
2349
2350 return attrdefOid;
2351 }
2352
2353 /*
2354 * Store a check-constraint expression for the given relation.
2355 *
2356 * Caller is responsible for updating the count of constraints
2357 * in the pg_class entry for the relation.
2358 *
2359 * The OID of the new constraint is returned.
2360 */
2361 static Oid
StoreRelCheck(Relation rel,const char * ccname,Node * expr,bool is_validated,bool is_local,int inhcount,bool is_no_inherit,bool is_internal)2362 StoreRelCheck(Relation rel, const char *ccname, Node *expr,
2363 bool is_validated, bool is_local, int inhcount,
2364 bool is_no_inherit, bool is_internal)
2365 {
2366 char *ccbin;
2367 List *varList;
2368 int keycount;
2369 int16 *attNos;
2370 Oid constrOid;
2371
2372 /*
2373 * Flatten expression to string form for storage.
2374 */
2375 ccbin = nodeToString(expr);
2376
2377 /*
2378 * Find columns of rel that are used in expr
2379 *
2380 * NB: pull_var_clause is okay here only because we don't allow subselects
2381 * in check constraints; it would fail to examine the contents of
2382 * subselects.
2383 */
2384 varList = pull_var_clause(expr, 0);
2385 keycount = list_length(varList);
2386
2387 if (keycount > 0)
2388 {
2389 ListCell *vl;
2390 int i = 0;
2391
2392 attNos = (int16 *) palloc(keycount * sizeof(int16));
2393 foreach(vl, varList)
2394 {
2395 Var *var = (Var *) lfirst(vl);
2396 int j;
2397
2398 for (j = 0; j < i; j++)
2399 if (attNos[j] == var->varattno)
2400 break;
2401 if (j == i)
2402 attNos[i++] = var->varattno;
2403 }
2404 keycount = i;
2405 }
2406 else
2407 attNos = NULL;
2408
2409 /*
2410 * Partitioned tables do not contain any rows themselves, so a NO INHERIT
2411 * constraint makes no sense.
2412 */
2413 if (is_no_inherit &&
2414 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2415 ereport(ERROR,
2416 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2417 errmsg("cannot add NO INHERIT constraint to partitioned table \"%s\"",
2418 RelationGetRelationName(rel))));
2419
2420 /*
2421 * Create the Check Constraint
2422 */
2423 constrOid =
2424 CreateConstraintEntry(ccname, /* Constraint Name */
2425 RelationGetNamespace(rel), /* namespace */
2426 CONSTRAINT_CHECK, /* Constraint Type */
2427 false, /* Is Deferrable */
2428 false, /* Is Deferred */
2429 is_validated,
2430 InvalidOid, /* no parent constraint */
2431 RelationGetRelid(rel), /* relation */
2432 attNos, /* attrs in the constraint */
2433 keycount, /* # key attrs in the constraint */
2434 keycount, /* # total attrs in the constraint */
2435 InvalidOid, /* not a domain constraint */
2436 InvalidOid, /* no associated index */
2437 InvalidOid, /* Foreign key fields */
2438 NULL,
2439 NULL,
2440 NULL,
2441 NULL,
2442 0,
2443 ' ',
2444 ' ',
2445 ' ',
2446 NULL, /* not an exclusion constraint */
2447 expr, /* Tree form of check constraint */
2448 ccbin, /* Binary form of check constraint */
2449 is_local, /* conislocal */
2450 inhcount, /* coninhcount */
2451 is_no_inherit, /* connoinherit */
2452 is_internal); /* internally constructed? */
2453
2454 pfree(ccbin);
2455
2456 return constrOid;
2457 }
2458
2459 /*
2460 * Store defaults and constraints (passed as a list of CookedConstraint).
2461 *
2462 * Each CookedConstraint struct is modified to store the new catalog tuple OID.
2463 *
2464 * NOTE: only pre-cooked expressions will be passed this way, which is to
2465 * say expressions inherited from an existing relation. Newly parsed
2466 * expressions can be added later, by direct calls to StoreAttrDefault
2467 * and StoreRelCheck (see AddRelationNewConstraints()).
2468 */
2469 static void
StoreConstraints(Relation rel,List * cooked_constraints,bool is_internal)2470 StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
2471 {
2472 int numchecks = 0;
2473 ListCell *lc;
2474
2475 if (cooked_constraints == NIL)
2476 return; /* nothing to do */
2477
2478 /*
2479 * Deparsing of constraint expressions will fail unless the just-created
2480 * pg_attribute tuples for this relation are made visible. So, bump the
2481 * command counter. CAUTION: this will cause a relcache entry rebuild.
2482 */
2483 CommandCounterIncrement();
2484
2485 foreach(lc, cooked_constraints)
2486 {
2487 CookedConstraint *con = (CookedConstraint *) lfirst(lc);
2488
2489 switch (con->contype)
2490 {
2491 case CONSTR_DEFAULT:
2492 con->conoid = StoreAttrDefault(rel, con->attnum, con->expr,
2493 is_internal, false);
2494 break;
2495 case CONSTR_CHECK:
2496 con->conoid =
2497 StoreRelCheck(rel, con->name, con->expr,
2498 !con->skip_validation, con->is_local,
2499 con->inhcount, con->is_no_inherit,
2500 is_internal);
2501 numchecks++;
2502 break;
2503 default:
2504 elog(ERROR, "unrecognized constraint type: %d",
2505 (int) con->contype);
2506 }
2507 }
2508
2509 if (numchecks > 0)
2510 SetRelationNumChecks(rel, numchecks);
2511 }
2512
2513 /*
2514 * AddRelationNewConstraints
2515 *
2516 * Add new column default expressions and/or constraint check expressions
2517 * to an existing relation. This is defined to do both for efficiency in
2518 * DefineRelation, but of course you can do just one or the other by passing
2519 * empty lists.
2520 *
2521 * rel: relation to be modified
2522 * newColDefaults: list of RawColumnDefault structures
2523 * newConstraints: list of Constraint nodes
2524 * allow_merge: true if check constraints may be merged with existing ones
2525 * is_local: true if definition is local, false if it's inherited
2526 * is_internal: true if result of some internal process, not a user request
2527 *
2528 * All entries in newColDefaults will be processed. Entries in newConstraints
2529 * will be processed only if they are CONSTR_CHECK type.
2530 *
2531 * Returns a list of CookedConstraint nodes that shows the cooked form of
2532 * the default and constraint expressions added to the relation.
2533 *
2534 * NB: caller should have opened rel with AccessExclusiveLock, and should
2535 * hold that lock till end of transaction. Also, we assume the caller has
2536 * done a CommandCounterIncrement if necessary to make the relation's catalog
2537 * tuples visible.
2538 */
2539 List *
AddRelationNewConstraints(Relation rel,List * newColDefaults,List * newConstraints,bool allow_merge,bool is_local,bool is_internal,const char * queryString)2540 AddRelationNewConstraints(Relation rel,
2541 List *newColDefaults,
2542 List *newConstraints,
2543 bool allow_merge,
2544 bool is_local,
2545 bool is_internal,
2546 const char *queryString)
2547 {
2548 List *cookedConstraints = NIL;
2549 TupleDesc tupleDesc;
2550 TupleConstr *oldconstr;
2551 int numoldchecks;
2552 ParseState *pstate;
2553 ParseNamespaceItem *nsitem;
2554 int numchecks;
2555 List *checknames;
2556 ListCell *cell;
2557 Node *expr;
2558 CookedConstraint *cooked;
2559
2560 /*
2561 * Get info about existing constraints.
2562 */
2563 tupleDesc = RelationGetDescr(rel);
2564 oldconstr = tupleDesc->constr;
2565 if (oldconstr)
2566 numoldchecks = oldconstr->num_check;
2567 else
2568 numoldchecks = 0;
2569
2570 /*
2571 * Create a dummy ParseState and insert the target relation as its sole
2572 * rangetable entry. We need a ParseState for transformExpr.
2573 */
2574 pstate = make_parsestate(NULL);
2575 pstate->p_sourcetext = queryString;
2576 nsitem = addRangeTableEntryForRelation(pstate,
2577 rel,
2578 AccessShareLock,
2579 NULL,
2580 false,
2581 true);
2582 addNSItemToQuery(pstate, nsitem, true, true, true);
2583
2584 /*
2585 * Process column default expressions.
2586 */
2587 foreach(cell, newColDefaults)
2588 {
2589 RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2590 Form_pg_attribute atp = TupleDescAttr(rel->rd_att, colDef->attnum - 1);
2591 Oid defOid;
2592
2593 expr = cookDefault(pstate, colDef->raw_default,
2594 atp->atttypid, atp->atttypmod,
2595 NameStr(atp->attname),
2596 atp->attgenerated);
2597
2598 /*
2599 * If the expression is just a NULL constant, we do not bother to make
2600 * an explicit pg_attrdef entry, since the default behavior is
2601 * equivalent. This applies to column defaults, but not for
2602 * generation expressions.
2603 *
2604 * Note a nonobvious property of this test: if the column is of a
2605 * domain type, what we'll get is not a bare null Const but a
2606 * CoerceToDomain expr, so we will not discard the default. This is
2607 * critical because the column default needs to be retained to
2608 * override any default that the domain might have.
2609 */
2610 if (expr == NULL ||
2611 (!colDef->generated &&
2612 IsA(expr, Const) &&
2613 castNode(Const, expr)->constisnull))
2614 continue;
2615
2616 /* If the DEFAULT is volatile we cannot use a missing value */
2617 if (colDef->missingMode && contain_volatile_functions((Node *) expr))
2618 colDef->missingMode = false;
2619
2620 defOid = StoreAttrDefault(rel, colDef->attnum, expr, is_internal,
2621 colDef->missingMode);
2622
2623 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2624 cooked->contype = CONSTR_DEFAULT;
2625 cooked->conoid = defOid;
2626 cooked->name = NULL;
2627 cooked->attnum = colDef->attnum;
2628 cooked->expr = expr;
2629 cooked->skip_validation = false;
2630 cooked->is_local = is_local;
2631 cooked->inhcount = is_local ? 0 : 1;
2632 cooked->is_no_inherit = false;
2633 cookedConstraints = lappend(cookedConstraints, cooked);
2634 }
2635
2636 /*
2637 * Process constraint expressions.
2638 */
2639 numchecks = numoldchecks;
2640 checknames = NIL;
2641 foreach(cell, newConstraints)
2642 {
2643 Constraint *cdef = (Constraint *) lfirst(cell);
2644 char *ccname;
2645 Oid constrOid;
2646
2647 if (cdef->contype != CONSTR_CHECK)
2648 continue;
2649
2650 if (cdef->raw_expr != NULL)
2651 {
2652 Assert(cdef->cooked_expr == NULL);
2653
2654 /*
2655 * Transform raw parsetree to executable expression, and verify
2656 * it's valid as a CHECK constraint.
2657 */
2658 expr = cookConstraint(pstate, cdef->raw_expr,
2659 RelationGetRelationName(rel));
2660 }
2661 else
2662 {
2663 Assert(cdef->cooked_expr != NULL);
2664
2665 /*
2666 * Here, we assume the parser will only pass us valid CHECK
2667 * expressions, so we do no particular checking.
2668 */
2669 expr = stringToNode(cdef->cooked_expr);
2670 }
2671
2672 /*
2673 * Check name uniqueness, or generate a name if none was given.
2674 */
2675 if (cdef->conname != NULL)
2676 {
2677 ListCell *cell2;
2678
2679 ccname = cdef->conname;
2680 /* Check against other new constraints */
2681 /* Needed because we don't do CommandCounterIncrement in loop */
2682 foreach(cell2, checknames)
2683 {
2684 if (strcmp((char *) lfirst(cell2), ccname) == 0)
2685 ereport(ERROR,
2686 (errcode(ERRCODE_DUPLICATE_OBJECT),
2687 errmsg("check constraint \"%s\" already exists",
2688 ccname)));
2689 }
2690
2691 /* save name for future checks */
2692 checknames = lappend(checknames, ccname);
2693
2694 /*
2695 * Check against pre-existing constraints. If we are allowed to
2696 * merge with an existing constraint, there's no more to do here.
2697 * (We omit the duplicate constraint from the result, which is
2698 * what ATAddCheckConstraint wants.)
2699 */
2700 if (MergeWithExistingConstraint(rel, ccname, expr,
2701 allow_merge, is_local,
2702 cdef->initially_valid,
2703 cdef->is_no_inherit))
2704 continue;
2705 }
2706 else
2707 {
2708 /*
2709 * When generating a name, we want to create "tab_col_check" for a
2710 * column constraint and "tab_check" for a table constraint. We
2711 * no longer have any info about the syntactic positioning of the
2712 * constraint phrase, so we approximate this by seeing whether the
2713 * expression references more than one column. (If the user
2714 * played by the rules, the result is the same...)
2715 *
2716 * Note: pull_var_clause() doesn't descend into sublinks, but we
2717 * eliminated those above; and anyway this only needs to be an
2718 * approximate answer.
2719 */
2720 List *vars;
2721 char *colname;
2722
2723 vars = pull_var_clause(expr, 0);
2724
2725 /* eliminate duplicates */
2726 vars = list_union(NIL, vars);
2727
2728 if (list_length(vars) == 1)
2729 colname = get_attname(RelationGetRelid(rel),
2730 ((Var *) linitial(vars))->varattno,
2731 true);
2732 else
2733 colname = NULL;
2734
2735 ccname = ChooseConstraintName(RelationGetRelationName(rel),
2736 colname,
2737 "check",
2738 RelationGetNamespace(rel),
2739 checknames);
2740
2741 /* save name for future checks */
2742 checknames = lappend(checknames, ccname);
2743 }
2744
2745 /*
2746 * OK, store it.
2747 */
2748 constrOid =
2749 StoreRelCheck(rel, ccname, expr, cdef->initially_valid, is_local,
2750 is_local ? 0 : 1, cdef->is_no_inherit, is_internal);
2751
2752 numchecks++;
2753
2754 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2755 cooked->contype = CONSTR_CHECK;
2756 cooked->conoid = constrOid;
2757 cooked->name = ccname;
2758 cooked->attnum = 0;
2759 cooked->expr = expr;
2760 cooked->skip_validation = cdef->skip_validation;
2761 cooked->is_local = is_local;
2762 cooked->inhcount = is_local ? 0 : 1;
2763 cooked->is_no_inherit = cdef->is_no_inherit;
2764 cookedConstraints = lappend(cookedConstraints, cooked);
2765 }
2766
2767 /*
2768 * Update the count of constraints in the relation's pg_class tuple. We do
2769 * this even if there was no change, in order to ensure that an SI update
2770 * message is sent out for the pg_class tuple, which will force other
2771 * backends to rebuild their relcache entries for the rel. (This is
2772 * critical if we added defaults but not constraints.)
2773 */
2774 SetRelationNumChecks(rel, numchecks);
2775
2776 return cookedConstraints;
2777 }
2778
2779 /*
2780 * Check for a pre-existing check constraint that conflicts with a proposed
2781 * new one, and either adjust its conislocal/coninhcount settings or throw
2782 * error as needed.
2783 *
2784 * Returns true if merged (constraint is a duplicate), or false if it's
2785 * got a so-far-unique name, or throws error if conflict.
2786 *
2787 * XXX See MergeConstraintsIntoExisting too if you change this code.
2788 */
2789 static bool
MergeWithExistingConstraint(Relation rel,const char * ccname,Node * expr,bool allow_merge,bool is_local,bool is_initially_valid,bool is_no_inherit)2790 MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
2791 bool allow_merge, bool is_local,
2792 bool is_initially_valid,
2793 bool is_no_inherit)
2794 {
2795 bool found;
2796 Relation conDesc;
2797 SysScanDesc conscan;
2798 ScanKeyData skey[3];
2799 HeapTuple tup;
2800
2801 /* Search for a pg_constraint entry with same name and relation */
2802 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
2803
2804 found = false;
2805
2806 ScanKeyInit(&skey[0],
2807 Anum_pg_constraint_conrelid,
2808 BTEqualStrategyNumber, F_OIDEQ,
2809 ObjectIdGetDatum(RelationGetRelid(rel)));
2810 ScanKeyInit(&skey[1],
2811 Anum_pg_constraint_contypid,
2812 BTEqualStrategyNumber, F_OIDEQ,
2813 ObjectIdGetDatum(InvalidOid));
2814 ScanKeyInit(&skey[2],
2815 Anum_pg_constraint_conname,
2816 BTEqualStrategyNumber, F_NAMEEQ,
2817 CStringGetDatum(ccname));
2818
2819 conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId, true,
2820 NULL, 3, skey);
2821
2822 /* There can be at most one matching row */
2823 if (HeapTupleIsValid(tup = systable_getnext(conscan)))
2824 {
2825 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2826
2827 /* Found it. Conflicts if not identical check constraint */
2828 if (con->contype == CONSTRAINT_CHECK)
2829 {
2830 Datum val;
2831 bool isnull;
2832
2833 val = fastgetattr(tup,
2834 Anum_pg_constraint_conbin,
2835 conDesc->rd_att, &isnull);
2836 if (isnull)
2837 elog(ERROR, "null conbin for rel %s",
2838 RelationGetRelationName(rel));
2839 if (equal(expr, stringToNode(TextDatumGetCString(val))))
2840 found = true;
2841 }
2842
2843 /*
2844 * If the existing constraint is purely inherited (no local
2845 * definition) then interpret addition of a local constraint as a
2846 * legal merge. This allows ALTER ADD CONSTRAINT on parent and child
2847 * tables to be given in either order with same end state. However if
2848 * the relation is a partition, all inherited constraints are always
2849 * non-local, including those that were merged.
2850 */
2851 if (is_local && !con->conislocal && !rel->rd_rel->relispartition)
2852 allow_merge = true;
2853
2854 if (!found || !allow_merge)
2855 ereport(ERROR,
2856 (errcode(ERRCODE_DUPLICATE_OBJECT),
2857 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2858 ccname, RelationGetRelationName(rel))));
2859
2860 /* If the child constraint is "no inherit" then cannot merge */
2861 if (con->connoinherit)
2862 ereport(ERROR,
2863 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2864 errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
2865 ccname, RelationGetRelationName(rel))));
2866
2867 /*
2868 * Must not change an existing inherited constraint to "no inherit"
2869 * status. That's because inherited constraints should be able to
2870 * propagate to lower-level children.
2871 */
2872 if (con->coninhcount > 0 && is_no_inherit)
2873 ereport(ERROR,
2874 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2875 errmsg("constraint \"%s\" conflicts with inherited constraint on relation \"%s\"",
2876 ccname, RelationGetRelationName(rel))));
2877
2878 /*
2879 * If the child constraint is "not valid" then cannot merge with a
2880 * valid parent constraint.
2881 */
2882 if (is_initially_valid && !con->convalidated)
2883 ereport(ERROR,
2884 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2885 errmsg("constraint \"%s\" conflicts with NOT VALID constraint on relation \"%s\"",
2886 ccname, RelationGetRelationName(rel))));
2887
2888 /* OK to update the tuple */
2889 ereport(NOTICE,
2890 (errmsg("merging constraint \"%s\" with inherited definition",
2891 ccname)));
2892
2893 tup = heap_copytuple(tup);
2894 con = (Form_pg_constraint) GETSTRUCT(tup);
2895
2896 /*
2897 * In case of partitions, an inherited constraint must be inherited
2898 * only once since it cannot have multiple parents and it is never
2899 * considered local.
2900 */
2901 if (rel->rd_rel->relispartition)
2902 {
2903 con->coninhcount = 1;
2904 con->conislocal = false;
2905 }
2906 else
2907 {
2908 if (is_local)
2909 con->conislocal = true;
2910 else
2911 con->coninhcount++;
2912 }
2913
2914 if (is_no_inherit)
2915 {
2916 Assert(is_local);
2917 con->connoinherit = true;
2918 }
2919
2920 CatalogTupleUpdate(conDesc, &tup->t_self, tup);
2921 }
2922
2923 systable_endscan(conscan);
2924 table_close(conDesc, RowExclusiveLock);
2925
2926 return found;
2927 }
2928
2929 /*
2930 * Update the count of constraints in the relation's pg_class tuple.
2931 *
2932 * Caller had better hold exclusive lock on the relation.
2933 *
2934 * An important side effect is that a SI update message will be sent out for
2935 * the pg_class tuple, which will force other backends to rebuild their
2936 * relcache entries for the rel. Also, this backend will rebuild its
2937 * own relcache entry at the next CommandCounterIncrement.
2938 */
2939 static void
SetRelationNumChecks(Relation rel,int numchecks)2940 SetRelationNumChecks(Relation rel, int numchecks)
2941 {
2942 Relation relrel;
2943 HeapTuple reltup;
2944 Form_pg_class relStruct;
2945
2946 relrel = table_open(RelationRelationId, RowExclusiveLock);
2947 reltup = SearchSysCacheCopy1(RELOID,
2948 ObjectIdGetDatum(RelationGetRelid(rel)));
2949 if (!HeapTupleIsValid(reltup))
2950 elog(ERROR, "cache lookup failed for relation %u",
2951 RelationGetRelid(rel));
2952 relStruct = (Form_pg_class) GETSTRUCT(reltup);
2953
2954 if (relStruct->relchecks != numchecks)
2955 {
2956 relStruct->relchecks = numchecks;
2957
2958 CatalogTupleUpdate(relrel, &reltup->t_self, reltup);
2959 }
2960 else
2961 {
2962 /* Skip the disk update, but force relcache inval anyway */
2963 CacheInvalidateRelcache(rel);
2964 }
2965
2966 heap_freetuple(reltup);
2967 table_close(relrel, RowExclusiveLock);
2968 }
2969
2970 /*
2971 * Check for references to generated columns
2972 */
2973 static bool
check_nested_generated_walker(Node * node,void * context)2974 check_nested_generated_walker(Node *node, void *context)
2975 {
2976 ParseState *pstate = context;
2977
2978 if (node == NULL)
2979 return false;
2980 else if (IsA(node, Var))
2981 {
2982 Var *var = (Var *) node;
2983 Oid relid;
2984 AttrNumber attnum;
2985
2986 relid = rt_fetch(var->varno, pstate->p_rtable)->relid;
2987 if (!OidIsValid(relid))
2988 return false; /* XXX shouldn't we raise an error? */
2989
2990 attnum = var->varattno;
2991
2992 if (attnum > 0 && get_attgenerated(relid, attnum))
2993 ereport(ERROR,
2994 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2995 errmsg("cannot use generated column \"%s\" in column generation expression",
2996 get_attname(relid, attnum, false)),
2997 errdetail("A generated column cannot reference another generated column."),
2998 parser_errposition(pstate, var->location)));
2999 /* A whole-row Var is necessarily self-referential, so forbid it */
3000 if (attnum == 0)
3001 ereport(ERROR,
3002 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3003 errmsg("cannot use whole-row variable in column generation expression"),
3004 errdetail("This would cause the generated column to depend on its own value."),
3005 parser_errposition(pstate, var->location)));
3006 /* System columns were already checked in the parser */
3007
3008 return false;
3009 }
3010 else
3011 return expression_tree_walker(node, check_nested_generated_walker,
3012 (void *) context);
3013 }
3014
3015 static void
check_nested_generated(ParseState * pstate,Node * node)3016 check_nested_generated(ParseState *pstate, Node *node)
3017 {
3018 check_nested_generated_walker(node, pstate);
3019 }
3020
3021 /*
3022 * Take a raw default and convert it to a cooked format ready for
3023 * storage.
3024 *
3025 * Parse state should be set up to recognize any vars that might appear
3026 * in the expression. (Even though we plan to reject vars, it's more
3027 * user-friendly to give the correct error message than "unknown var".)
3028 *
3029 * If atttypid is not InvalidOid, coerce the expression to the specified
3030 * type (and typmod atttypmod). attname is only needed in this case:
3031 * it is used in the error message, if any.
3032 */
3033 Node *
cookDefault(ParseState * pstate,Node * raw_default,Oid atttypid,int32 atttypmod,const char * attname,char attgenerated)3034 cookDefault(ParseState *pstate,
3035 Node *raw_default,
3036 Oid atttypid,
3037 int32 atttypmod,
3038 const char *attname,
3039 char attgenerated)
3040 {
3041 Node *expr;
3042
3043 Assert(raw_default != NULL);
3044
3045 /*
3046 * Transform raw parsetree to executable expression.
3047 */
3048 expr = transformExpr(pstate, raw_default, attgenerated ? EXPR_KIND_GENERATED_COLUMN : EXPR_KIND_COLUMN_DEFAULT);
3049
3050 if (attgenerated)
3051 {
3052 check_nested_generated(pstate, expr);
3053
3054 if (contain_mutable_functions(expr))
3055 ereport(ERROR,
3056 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3057 errmsg("generation expression is not immutable")));
3058 }
3059 else
3060 {
3061 /*
3062 * For a default expression, transformExpr() should have rejected
3063 * column references.
3064 */
3065 Assert(!contain_var_clause(expr));
3066 }
3067
3068 /*
3069 * Coerce the expression to the correct type and typmod, if given. This
3070 * should match the parser's processing of non-defaulted expressions ---
3071 * see transformAssignedExpr().
3072 */
3073 if (OidIsValid(atttypid))
3074 {
3075 Oid type_id = exprType(expr);
3076
3077 expr = coerce_to_target_type(pstate, expr, type_id,
3078 atttypid, atttypmod,
3079 COERCION_ASSIGNMENT,
3080 COERCE_IMPLICIT_CAST,
3081 -1);
3082 if (expr == NULL)
3083 ereport(ERROR,
3084 (errcode(ERRCODE_DATATYPE_MISMATCH),
3085 errmsg("column \"%s\" is of type %s"
3086 " but default expression is of type %s",
3087 attname,
3088 format_type_be(atttypid),
3089 format_type_be(type_id)),
3090 errhint("You will need to rewrite or cast the expression.")));
3091 }
3092
3093 /*
3094 * Finally, take care of collations in the finished expression.
3095 */
3096 assign_expr_collations(pstate, expr);
3097
3098 return expr;
3099 }
3100
3101 /*
3102 * Take a raw CHECK constraint expression and convert it to a cooked format
3103 * ready for storage.
3104 *
3105 * Parse state must be set up to recognize any vars that might appear
3106 * in the expression.
3107 */
3108 static Node *
cookConstraint(ParseState * pstate,Node * raw_constraint,char * relname)3109 cookConstraint(ParseState *pstate,
3110 Node *raw_constraint,
3111 char *relname)
3112 {
3113 Node *expr;
3114
3115 /*
3116 * Transform raw parsetree to executable expression.
3117 */
3118 expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
3119
3120 /*
3121 * Make sure it yields a boolean result.
3122 */
3123 expr = coerce_to_boolean(pstate, expr, "CHECK");
3124
3125 /*
3126 * Take care of collations.
3127 */
3128 assign_expr_collations(pstate, expr);
3129
3130 /*
3131 * Make sure no outside relations are referred to (this is probably dead
3132 * code now that add_missing_from is history).
3133 */
3134 if (list_length(pstate->p_rtable) != 1)
3135 ereport(ERROR,
3136 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
3137 errmsg("only table \"%s\" can be referenced in check constraint",
3138 relname)));
3139
3140 return expr;
3141 }
3142
3143 /*
3144 * CopyStatistics --- copy entries in pg_statistic from one rel to another
3145 */
3146 void
CopyStatistics(Oid fromrelid,Oid torelid)3147 CopyStatistics(Oid fromrelid, Oid torelid)
3148 {
3149 HeapTuple tup;
3150 SysScanDesc scan;
3151 ScanKeyData key[1];
3152 Relation statrel;
3153
3154 statrel = table_open(StatisticRelationId, RowExclusiveLock);
3155
3156 /* Now search for stat records */
3157 ScanKeyInit(&key[0],
3158 Anum_pg_statistic_starelid,
3159 BTEqualStrategyNumber, F_OIDEQ,
3160 ObjectIdGetDatum(fromrelid));
3161
3162 scan = systable_beginscan(statrel, StatisticRelidAttnumInhIndexId,
3163 true, NULL, 1, key);
3164
3165 while (HeapTupleIsValid((tup = systable_getnext(scan))))
3166 {
3167 Form_pg_statistic statform;
3168
3169 /* make a modifiable copy */
3170 tup = heap_copytuple(tup);
3171 statform = (Form_pg_statistic) GETSTRUCT(tup);
3172
3173 /* update the copy of the tuple and insert it */
3174 statform->starelid = torelid;
3175 CatalogTupleInsert(statrel, tup);
3176
3177 heap_freetuple(tup);
3178 }
3179
3180 systable_endscan(scan);
3181
3182 table_close(statrel, RowExclusiveLock);
3183 }
3184
3185 /*
3186 * RemoveStatistics --- remove entries in pg_statistic for a rel or column
3187 *
3188 * If attnum is zero, remove all entries for rel; else remove only the one(s)
3189 * for that column.
3190 */
3191 void
RemoveStatistics(Oid relid,AttrNumber attnum)3192 RemoveStatistics(Oid relid, AttrNumber attnum)
3193 {
3194 Relation pgstatistic;
3195 SysScanDesc scan;
3196 ScanKeyData key[2];
3197 int nkeys;
3198 HeapTuple tuple;
3199
3200 pgstatistic = table_open(StatisticRelationId, RowExclusiveLock);
3201
3202 ScanKeyInit(&key[0],
3203 Anum_pg_statistic_starelid,
3204 BTEqualStrategyNumber, F_OIDEQ,
3205 ObjectIdGetDatum(relid));
3206
3207 if (attnum == 0)
3208 nkeys = 1;
3209 else
3210 {
3211 ScanKeyInit(&key[1],
3212 Anum_pg_statistic_staattnum,
3213 BTEqualStrategyNumber, F_INT2EQ,
3214 Int16GetDatum(attnum));
3215 nkeys = 2;
3216 }
3217
3218 scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
3219 NULL, nkeys, key);
3220
3221 /* we must loop even when attnum != 0, in case of inherited stats */
3222 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
3223 CatalogTupleDelete(pgstatistic, &tuple->t_self);
3224
3225 systable_endscan(scan);
3226
3227 table_close(pgstatistic, RowExclusiveLock);
3228 }
3229
3230
3231 /*
3232 * RelationTruncateIndexes - truncate all indexes associated
3233 * with the heap relation to zero tuples.
3234 *
3235 * The routine will truncate and then reconstruct the indexes on
3236 * the specified relation. Caller must hold exclusive lock on rel.
3237 */
3238 static void
RelationTruncateIndexes(Relation heapRelation)3239 RelationTruncateIndexes(Relation heapRelation)
3240 {
3241 ListCell *indlist;
3242
3243 /* Ask the relcache to produce a list of the indexes of the rel */
3244 foreach(indlist, RelationGetIndexList(heapRelation))
3245 {
3246 Oid indexId = lfirst_oid(indlist);
3247 Relation currentIndex;
3248 IndexInfo *indexInfo;
3249
3250 /* Open the index relation; use exclusive lock, just to be sure */
3251 currentIndex = index_open(indexId, AccessExclusiveLock);
3252
3253 /*
3254 * Fetch info needed for index_build. Since we know there are no
3255 * tuples that actually need indexing, we can use a dummy IndexInfo.
3256 * This is slightly cheaper to build, but the real point is to avoid
3257 * possibly running user-defined code in index expressions or
3258 * predicates. We might be getting invoked during ON COMMIT
3259 * processing, and we don't want to run any such code then.
3260 */
3261 indexInfo = BuildDummyIndexInfo(currentIndex);
3262
3263 /*
3264 * Now truncate the actual file (and discard buffers).
3265 */
3266 RelationTruncate(currentIndex, 0);
3267
3268 /* Initialize the index and rebuild */
3269 /* Note: we do not need to re-establish pkey setting */
3270 index_build(heapRelation, currentIndex, indexInfo, true, false);
3271
3272 /* We're done with this index */
3273 index_close(currentIndex, NoLock);
3274 }
3275 }
3276
3277 /*
3278 * heap_truncate
3279 *
3280 * This routine deletes all data within all the specified relations.
3281 *
3282 * This is not transaction-safe! There is another, transaction-safe
3283 * implementation in commands/tablecmds.c. We now use this only for
3284 * ON COMMIT truncation of temporary tables, where it doesn't matter.
3285 */
3286 void
heap_truncate(List * relids)3287 heap_truncate(List *relids)
3288 {
3289 List *relations = NIL;
3290 ListCell *cell;
3291
3292 /* Open relations for processing, and grab exclusive access on each */
3293 foreach(cell, relids)
3294 {
3295 Oid rid = lfirst_oid(cell);
3296 Relation rel;
3297
3298 rel = table_open(rid, AccessExclusiveLock);
3299 relations = lappend(relations, rel);
3300 }
3301
3302 /* Don't allow truncate on tables that are referenced by foreign keys */
3303 heap_truncate_check_FKs(relations, true);
3304
3305 /* OK to do it */
3306 foreach(cell, relations)
3307 {
3308 Relation rel = lfirst(cell);
3309
3310 /* Truncate the relation */
3311 heap_truncate_one_rel(rel);
3312
3313 /* Close the relation, but keep exclusive lock on it until commit */
3314 table_close(rel, NoLock);
3315 }
3316 }
3317
3318 /*
3319 * heap_truncate_one_rel
3320 *
3321 * This routine deletes all data within the specified relation.
3322 *
3323 * This is not transaction-safe, because the truncation is done immediately
3324 * and cannot be rolled back later. Caller is responsible for having
3325 * checked permissions etc, and must have obtained AccessExclusiveLock.
3326 */
3327 void
heap_truncate_one_rel(Relation rel)3328 heap_truncate_one_rel(Relation rel)
3329 {
3330 Oid toastrelid;
3331
3332 /*
3333 * Truncate the relation. Partitioned tables have no storage, so there is
3334 * nothing to do for them here.
3335 */
3336 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3337 return;
3338
3339 /* Truncate the underlying relation */
3340 table_relation_nontransactional_truncate(rel);
3341
3342 /* If the relation has indexes, truncate the indexes too */
3343 RelationTruncateIndexes(rel);
3344
3345 /* If there is a toast table, truncate that too */
3346 toastrelid = rel->rd_rel->reltoastrelid;
3347 if (OidIsValid(toastrelid))
3348 {
3349 Relation toastrel = table_open(toastrelid, AccessExclusiveLock);
3350
3351 table_relation_nontransactional_truncate(toastrel);
3352 RelationTruncateIndexes(toastrel);
3353 /* keep the lock... */
3354 table_close(toastrel, NoLock);
3355 }
3356 }
3357
3358 /*
3359 * heap_truncate_check_FKs
3360 * Check for foreign keys referencing a list of relations that
3361 * are to be truncated, and raise error if there are any
3362 *
3363 * We disallow such FKs (except self-referential ones) since the whole point
3364 * of TRUNCATE is to not scan the individual rows to be thrown away.
3365 *
3366 * This is split out so it can be shared by both implementations of truncate.
3367 * Caller should already hold a suitable lock on the relations.
3368 *
3369 * tempTables is only used to select an appropriate error message.
3370 */
3371 void
heap_truncate_check_FKs(List * relations,bool tempTables)3372 heap_truncate_check_FKs(List *relations, bool tempTables)
3373 {
3374 List *oids = NIL;
3375 List *dependents;
3376 ListCell *cell;
3377
3378 /*
3379 * Build a list of OIDs of the interesting relations.
3380 *
3381 * If a relation has no triggers, then it can neither have FKs nor be
3382 * referenced by a FK from another table, so we can ignore it. For
3383 * partitioned tables, FKs have no triggers, so we must include them
3384 * anyway.
3385 */
3386 foreach(cell, relations)
3387 {
3388 Relation rel = lfirst(cell);
3389
3390 if (rel->rd_rel->relhastriggers ||
3391 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3392 oids = lappend_oid(oids, RelationGetRelid(rel));
3393 }
3394
3395 /*
3396 * Fast path: if no relation has triggers, none has FKs either.
3397 */
3398 if (oids == NIL)
3399 return;
3400
3401 /*
3402 * Otherwise, must scan pg_constraint. We make one pass with all the
3403 * relations considered; if this finds nothing, then all is well.
3404 */
3405 dependents = heap_truncate_find_FKs(oids);
3406 if (dependents == NIL)
3407 return;
3408
3409 /*
3410 * Otherwise we repeat the scan once per relation to identify a particular
3411 * pair of relations to complain about. This is pretty slow, but
3412 * performance shouldn't matter much in a failure path. The reason for
3413 * doing things this way is to ensure that the message produced is not
3414 * dependent on chance row locations within pg_constraint.
3415 */
3416 foreach(cell, oids)
3417 {
3418 Oid relid = lfirst_oid(cell);
3419 ListCell *cell2;
3420
3421 dependents = heap_truncate_find_FKs(list_make1_oid(relid));
3422
3423 foreach(cell2, dependents)
3424 {
3425 Oid relid2 = lfirst_oid(cell2);
3426
3427 if (!list_member_oid(oids, relid2))
3428 {
3429 char *relname = get_rel_name(relid);
3430 char *relname2 = get_rel_name(relid2);
3431
3432 if (tempTables)
3433 ereport(ERROR,
3434 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3435 errmsg("unsupported ON COMMIT and foreign key combination"),
3436 errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
3437 relname2, relname)));
3438 else
3439 ereport(ERROR,
3440 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3441 errmsg("cannot truncate a table referenced in a foreign key constraint"),
3442 errdetail("Table \"%s\" references \"%s\".",
3443 relname2, relname),
3444 errhint("Truncate table \"%s\" at the same time, "
3445 "or use TRUNCATE ... CASCADE.",
3446 relname2)));
3447 }
3448 }
3449 }
3450 }
3451
3452 /*
3453 * heap_truncate_find_FKs
3454 * Find relations having foreign keys referencing any of the given rels
3455 *
3456 * Input and result are both lists of relation OIDs. The result contains
3457 * no duplicates, does *not* include any rels that were already in the input
3458 * list, and is sorted in OID order. (The last property is enforced mainly
3459 * to guarantee consistent behavior in the regression tests; we don't want
3460 * behavior to change depending on chance locations of rows in pg_constraint.)
3461 *
3462 * Note: caller should already have appropriate lock on all rels mentioned
3463 * in relationIds. Since adding or dropping an FK requires exclusive lock
3464 * on both rels, this ensures that the answer will be stable.
3465 */
3466 List *
heap_truncate_find_FKs(List * relationIds)3467 heap_truncate_find_FKs(List *relationIds)
3468 {
3469 List *result = NIL;
3470 List *oids = list_copy(relationIds);
3471 List *parent_cons;
3472 ListCell *cell;
3473 ScanKeyData key;
3474 Relation fkeyRel;
3475 SysScanDesc fkeyScan;
3476 HeapTuple tuple;
3477 bool restart;
3478
3479 oids = list_copy(relationIds);
3480
3481 /*
3482 * Must scan pg_constraint. Right now, it is a seqscan because there is
3483 * no available index on confrelid.
3484 */
3485 fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
3486
3487 restart:
3488 restart = false;
3489 parent_cons = NIL;
3490
3491 fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
3492 NULL, 0, NULL);
3493
3494 while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
3495 {
3496 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3497
3498 /* Not a foreign key */
3499 if (con->contype != CONSTRAINT_FOREIGN)
3500 continue;
3501
3502 /* Not referencing one of our list of tables */
3503 if (!list_member_oid(oids, con->confrelid))
3504 continue;
3505
3506 /*
3507 * If this constraint has a parent constraint which we have not seen
3508 * yet, keep track of it for the second loop, below. Tracking parent
3509 * constraints allows us to climb up to the top-level level constraint
3510 * and look for all possible relations referencing the partitioned
3511 * table.
3512 */
3513 if (OidIsValid(con->conparentid) &&
3514 !list_member_oid(parent_cons, con->conparentid))
3515 parent_cons = lappend_oid(parent_cons, con->conparentid);
3516
3517 /*
3518 * Add referencer to result, unless present in input list. (Don't
3519 * worry about dupes: we'll fix that below).
3520 */
3521 if (!list_member_oid(relationIds, con->conrelid))
3522 result = lappend_oid(result, con->conrelid);
3523 }
3524
3525 systable_endscan(fkeyScan);
3526
3527 /*
3528 * Process each parent constraint we found to add the list of referenced
3529 * relations by them to the oids list. If we do add any new such
3530 * relations, redo the first loop above. Also, if we see that the parent
3531 * constraint in turn has a parent, add that so that we process all
3532 * relations in a single additional pass.
3533 */
3534 foreach(cell, parent_cons)
3535 {
3536 Oid parent = lfirst_oid(cell);
3537
3538 ScanKeyInit(&key,
3539 Anum_pg_constraint_oid,
3540 BTEqualStrategyNumber, F_OIDEQ,
3541 ObjectIdGetDatum(parent));
3542
3543 fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
3544 true, NULL, 1, &key);
3545
3546 tuple = systable_getnext(fkeyScan);
3547 if (HeapTupleIsValid(tuple))
3548 {
3549 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3550
3551 /*
3552 * pg_constraint rows always appear for partitioned hierarchies
3553 * this way: on the each side of the constraint, one row appears
3554 * for each partition that points to the top-most table on the
3555 * other side.
3556 *
3557 * Because of this arrangement, we can correctly catch all
3558 * relevant relations by adding to 'parent_cons' all rows with
3559 * valid conparentid, and to the 'oids' list all rows with a zero
3560 * conparentid. If any oids are added to 'oids', redo the first
3561 * loop above by setting 'restart'.
3562 */
3563 if (OidIsValid(con->conparentid))
3564 parent_cons = list_append_unique_oid(parent_cons,
3565 con->conparentid);
3566 else if (!list_member_oid(oids, con->confrelid))
3567 {
3568 oids = lappend_oid(oids, con->confrelid);
3569 restart = true;
3570 }
3571 }
3572
3573 systable_endscan(fkeyScan);
3574 }
3575
3576 list_free(parent_cons);
3577 if (restart)
3578 goto restart;
3579
3580 table_close(fkeyRel, AccessShareLock);
3581 list_free(oids);
3582
3583 /* Now sort and de-duplicate the result list */
3584 list_sort(result, list_oid_cmp);
3585 list_deduplicate_oid(result);
3586
3587 return result;
3588 }
3589
3590 /*
3591 * StorePartitionKey
3592 * Store information about the partition key rel into the catalog
3593 */
3594 void
StorePartitionKey(Relation rel,char strategy,int16 partnatts,AttrNumber * partattrs,List * partexprs,Oid * partopclass,Oid * partcollation)3595 StorePartitionKey(Relation rel,
3596 char strategy,
3597 int16 partnatts,
3598 AttrNumber *partattrs,
3599 List *partexprs,
3600 Oid *partopclass,
3601 Oid *partcollation)
3602 {
3603 int i;
3604 int2vector *partattrs_vec;
3605 oidvector *partopclass_vec;
3606 oidvector *partcollation_vec;
3607 Datum partexprDatum;
3608 Relation pg_partitioned_table;
3609 HeapTuple tuple;
3610 Datum values[Natts_pg_partitioned_table];
3611 bool nulls[Natts_pg_partitioned_table];
3612 ObjectAddress myself;
3613 ObjectAddress referenced;
3614
3615 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
3616
3617 /* Copy the partition attribute numbers, opclass OIDs into arrays */
3618 partattrs_vec = buildint2vector(partattrs, partnatts);
3619 partopclass_vec = buildoidvector(partopclass, partnatts);
3620 partcollation_vec = buildoidvector(partcollation, partnatts);
3621
3622 /* Convert the expressions (if any) to a text datum */
3623 if (partexprs)
3624 {
3625 char *exprString;
3626
3627 exprString = nodeToString(partexprs);
3628 partexprDatum = CStringGetTextDatum(exprString);
3629 pfree(exprString);
3630 }
3631 else
3632 partexprDatum = (Datum) 0;
3633
3634 pg_partitioned_table = table_open(PartitionedRelationId, RowExclusiveLock);
3635
3636 MemSet(nulls, false, sizeof(nulls));
3637
3638 /* Only this can ever be NULL */
3639 if (!partexprDatum)
3640 nulls[Anum_pg_partitioned_table_partexprs - 1] = true;
3641
3642 values[Anum_pg_partitioned_table_partrelid - 1] = ObjectIdGetDatum(RelationGetRelid(rel));
3643 values[Anum_pg_partitioned_table_partstrat - 1] = CharGetDatum(strategy);
3644 values[Anum_pg_partitioned_table_partnatts - 1] = Int16GetDatum(partnatts);
3645 values[Anum_pg_partitioned_table_partdefid - 1] = ObjectIdGetDatum(InvalidOid);
3646 values[Anum_pg_partitioned_table_partattrs - 1] = PointerGetDatum(partattrs_vec);
3647 values[Anum_pg_partitioned_table_partclass - 1] = PointerGetDatum(partopclass_vec);
3648 values[Anum_pg_partitioned_table_partcollation - 1] = PointerGetDatum(partcollation_vec);
3649 values[Anum_pg_partitioned_table_partexprs - 1] = partexprDatum;
3650
3651 tuple = heap_form_tuple(RelationGetDescr(pg_partitioned_table), values, nulls);
3652
3653 CatalogTupleInsert(pg_partitioned_table, tuple);
3654 table_close(pg_partitioned_table, RowExclusiveLock);
3655
3656 /* Mark this relation as dependent on a few things as follows */
3657 myself.classId = RelationRelationId;
3658 myself.objectId = RelationGetRelid(rel);
3659 myself.objectSubId = 0;
3660
3661 /* Operator class and collation per key column */
3662 for (i = 0; i < partnatts; i++)
3663 {
3664 referenced.classId = OperatorClassRelationId;
3665 referenced.objectId = partopclass[i];
3666 referenced.objectSubId = 0;
3667
3668 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3669
3670 /* The default collation is pinned, so don't bother recording it */
3671 if (OidIsValid(partcollation[i]) &&
3672 partcollation[i] != DEFAULT_COLLATION_OID)
3673 {
3674 referenced.classId = CollationRelationId;
3675 referenced.objectId = partcollation[i];
3676 referenced.objectSubId = 0;
3677
3678 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3679 }
3680 }
3681
3682 /*
3683 * The partitioning columns are made internally dependent on the table,
3684 * because we cannot drop any of them without dropping the whole table.
3685 * (ATExecDropColumn independently enforces that, but it's not bulletproof
3686 * so we need the dependencies too.)
3687 */
3688 for (i = 0; i < partnatts; i++)
3689 {
3690 if (partattrs[i] == 0)
3691 continue; /* ignore expressions here */
3692
3693 referenced.classId = RelationRelationId;
3694 referenced.objectId = RelationGetRelid(rel);
3695 referenced.objectSubId = partattrs[i];
3696
3697 recordDependencyOn(&referenced, &myself, DEPENDENCY_INTERNAL);
3698 }
3699
3700 /*
3701 * Also consider anything mentioned in partition expressions. External
3702 * references (e.g. functions) get NORMAL dependencies. Table columns
3703 * mentioned in the expressions are handled the same as plain partitioning
3704 * columns, i.e. they become internally dependent on the whole table.
3705 */
3706 if (partexprs)
3707 recordDependencyOnSingleRelExpr(&myself,
3708 (Node *) partexprs,
3709 RelationGetRelid(rel),
3710 DEPENDENCY_NORMAL,
3711 DEPENDENCY_INTERNAL,
3712 true /* reverse the self-deps */ );
3713
3714 /*
3715 * We must invalidate the relcache so that the next
3716 * CommandCounterIncrement() will cause the same to be rebuilt using the
3717 * information in just created catalog entry.
3718 */
3719 CacheInvalidateRelcache(rel);
3720 }
3721
3722 /*
3723 * RemovePartitionKeyByRelId
3724 * Remove pg_partitioned_table entry for a relation
3725 */
3726 void
RemovePartitionKeyByRelId(Oid relid)3727 RemovePartitionKeyByRelId(Oid relid)
3728 {
3729 Relation rel;
3730 HeapTuple tuple;
3731
3732 rel = table_open(PartitionedRelationId, RowExclusiveLock);
3733
3734 tuple = SearchSysCache1(PARTRELID, ObjectIdGetDatum(relid));
3735 if (!HeapTupleIsValid(tuple))
3736 elog(ERROR, "cache lookup failed for partition key of relation %u",
3737 relid);
3738
3739 CatalogTupleDelete(rel, &tuple->t_self);
3740
3741 ReleaseSysCache(tuple);
3742 table_close(rel, RowExclusiveLock);
3743 }
3744
3745 /*
3746 * StorePartitionBound
3747 * Update pg_class tuple of rel to store the partition bound and set
3748 * relispartition to true
3749 *
3750 * If this is the default partition, also update the default partition OID in
3751 * pg_partitioned_table.
3752 *
3753 * Also, invalidate the parent's relcache, so that the next rebuild will load
3754 * the new partition's info into its partition descriptor. If there is a
3755 * default partition, we must invalidate its relcache entry as well.
3756 */
3757 void
StorePartitionBound(Relation rel,Relation parent,PartitionBoundSpec * bound)3758 StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
3759 {
3760 Relation classRel;
3761 HeapTuple tuple,
3762 newtuple;
3763 Datum new_val[Natts_pg_class];
3764 bool new_null[Natts_pg_class],
3765 new_repl[Natts_pg_class];
3766 Oid defaultPartOid;
3767
3768 /* Update pg_class tuple */
3769 classRel = table_open(RelationRelationId, RowExclusiveLock);
3770 tuple = SearchSysCacheCopy1(RELOID,
3771 ObjectIdGetDatum(RelationGetRelid(rel)));
3772 if (!HeapTupleIsValid(tuple))
3773 elog(ERROR, "cache lookup failed for relation %u",
3774 RelationGetRelid(rel));
3775
3776 #ifdef USE_ASSERT_CHECKING
3777 {
3778 Form_pg_class classForm;
3779 bool isnull;
3780
3781 classForm = (Form_pg_class) GETSTRUCT(tuple);
3782 Assert(!classForm->relispartition);
3783 (void) SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relpartbound,
3784 &isnull);
3785 Assert(isnull);
3786 }
3787 #endif
3788
3789 /* Fill in relpartbound value */
3790 memset(new_val, 0, sizeof(new_val));
3791 memset(new_null, false, sizeof(new_null));
3792 memset(new_repl, false, sizeof(new_repl));
3793 new_val[Anum_pg_class_relpartbound - 1] = CStringGetTextDatum(nodeToString(bound));
3794 new_null[Anum_pg_class_relpartbound - 1] = false;
3795 new_repl[Anum_pg_class_relpartbound - 1] = true;
3796 newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
3797 new_val, new_null, new_repl);
3798 /* Also set the flag */
3799 ((Form_pg_class) GETSTRUCT(newtuple))->relispartition = true;
3800 CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
3801 heap_freetuple(newtuple);
3802 table_close(classRel, RowExclusiveLock);
3803
3804 /*
3805 * If we're storing bounds for the default partition, update
3806 * pg_partitioned_table too.
3807 */
3808 if (bound->is_default)
3809 update_default_partition_oid(RelationGetRelid(parent),
3810 RelationGetRelid(rel));
3811
3812 /* Make these updates visible */
3813 CommandCounterIncrement();
3814
3815 /*
3816 * The partition constraint for the default partition depends on the
3817 * partition bounds of every other partition, so we must invalidate the
3818 * relcache entry for that partition every time a partition is added or
3819 * removed.
3820 */
3821 defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(parent));
3822 if (OidIsValid(defaultPartOid))
3823 CacheInvalidateRelcacheByRelid(defaultPartOid);
3824
3825 CacheInvalidateRelcache(parent);
3826 }
3827