1 /*-------------------------------------------------------------------------
2 *
3 * heap.c
4 * code to create and destroy POSTGRES heap relations
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/catalog/heap.c
12 *
13 *
14 * INTERFACE ROUTINES
15 * heap_create() - Create an uncataloged heap relation
16 * heap_create_with_catalog() - Create a cataloged relation
17 * heap_drop_with_catalog() - Removes named relation from catalogs
18 *
19 * NOTES
20 * this code taken from access/heap/create.c, which contains
21 * the old heap_create_with_catalog, amcreate, and amdestroy.
22 * those routines will soon call these routines using the function
23 * manager,
24 * just like the poorly named "NewXXX" routines do. The
25 * "New" routines are all going to die soon, once and for all!
26 * -cim 1/13/91
27 *
28 *-------------------------------------------------------------------------
29 */
30 #include "postgres.h"
31
32 #include "access/genam.h"
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/relation.h"
36 #include "access/sysattr.h"
37 #include "access/table.h"
38 #include "access/tableam.h"
39 #include "access/transam.h"
40 #include "access/xact.h"
41 #include "access/xlog.h"
42 #include "catalog/binary_upgrade.h"
43 #include "catalog/catalog.h"
44 #include "catalog/dependency.h"
45 #include "catalog/heap.h"
46 #include "catalog/index.h"
47 #include "catalog/objectaccess.h"
48 #include "catalog/partition.h"
49 #include "catalog/pg_am.h"
50 #include "catalog/pg_attrdef.h"
51 #include "catalog/pg_collation.h"
52 #include "catalog/pg_constraint.h"
53 #include "catalog/pg_foreign_table.h"
54 #include "catalog/pg_inherits.h"
55 #include "catalog/pg_namespace.h"
56 #include "catalog/pg_opclass.h"
57 #include "catalog/pg_partitioned_table.h"
58 #include "catalog/pg_statistic.h"
59 #include "catalog/pg_subscription_rel.h"
60 #include "catalog/pg_tablespace.h"
61 #include "catalog/pg_type.h"
62 #include "catalog/storage.h"
63 #include "catalog/storage_xlog.h"
64 #include "commands/tablecmds.h"
65 #include "commands/typecmds.h"
66 #include "executor/executor.h"
67 #include "miscadmin.h"
68 #include "nodes/nodeFuncs.h"
69 #include "optimizer/optimizer.h"
70 #include "parser/parse_coerce.h"
71 #include "parser/parse_collate.h"
72 #include "parser/parse_expr.h"
73 #include "parser/parse_relation.h"
74 #include "parser/parsetree.h"
75 #include "partitioning/partdesc.h"
76 #include "storage/lmgr.h"
77 #include "storage/predicate.h"
78 #include "storage/smgr.h"
79 #include "utils/acl.h"
80 #include "utils/builtins.h"
81 #include "utils/datum.h"
82 #include "utils/fmgroids.h"
83 #include "utils/inval.h"
84 #include "utils/lsyscache.h"
85 #include "utils/partcache.h"
86 #include "utils/rel.h"
87 #include "utils/ruleutils.h"
88 #include "utils/snapmgr.h"
89 #include "utils/syscache.h"
90
91
92 /* Potentially set by pg_upgrade_support functions */
93 Oid binary_upgrade_next_heap_pg_class_oid = InvalidOid;
94 Oid binary_upgrade_next_toast_pg_class_oid = InvalidOid;
95
96 static void AddNewRelationTuple(Relation pg_class_desc,
97 Relation new_rel_desc,
98 Oid new_rel_oid,
99 Oid new_type_oid,
100 Oid reloftype,
101 Oid relowner,
102 char relkind,
103 TransactionId relfrozenxid,
104 TransactionId relminmxid,
105 Datum relacl,
106 Datum reloptions);
107 static ObjectAddress AddNewRelationType(const char *typeName,
108 Oid typeNamespace,
109 Oid new_rel_oid,
110 char new_rel_kind,
111 Oid ownerid,
112 Oid new_row_type,
113 Oid new_array_type);
114 static void RelationRemoveInheritance(Oid relid);
115 static Oid StoreRelCheck(Relation rel, const char *ccname, Node *expr,
116 bool is_validated, bool is_local, int inhcount,
117 bool is_no_inherit, bool is_internal);
118 static void StoreConstraints(Relation rel, List *cooked_constraints,
119 bool is_internal);
120 static bool MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
121 bool allow_merge, bool is_local,
122 bool is_initially_valid,
123 bool is_no_inherit);
124 static void SetRelationNumChecks(Relation rel, int numchecks);
125 static Node *cookConstraint(ParseState *pstate,
126 Node *raw_constraint,
127 char *relname);
128 static List *insert_ordered_unique_oid(List *list, Oid datum);
129
130
131 /* ----------------------------------------------------------------
132 * XXX UGLY HARD CODED BADNESS FOLLOWS XXX
133 *
134 * these should all be moved to someplace in the lib/catalog
135 * module, if not obliterated first.
136 * ----------------------------------------------------------------
137 */
138
139
140 /*
141 * Note:
142 * Should the system special case these attributes in the future?
143 * Advantage: consume much less space in the ATTRIBUTE relation.
144 * Disadvantage: special cases will be all over the place.
145 */
146
147 /*
148 * The initializers below do not include trailing variable length fields,
149 * but that's OK - we're never going to reference anything beyond the
150 * fixed-size portion of the structure anyway.
151 */
152
153 static const FormData_pg_attribute a1 = {
154 .attname = {"ctid"},
155 .atttypid = TIDOID,
156 .attlen = sizeof(ItemPointerData),
157 .attnum = SelfItemPointerAttributeNumber,
158 .attcacheoff = -1,
159 .atttypmod = -1,
160 .attbyval = false,
161 .attstorage = 'p',
162 .attalign = 's',
163 .attnotnull = true,
164 .attislocal = true,
165 };
166
167 static const FormData_pg_attribute a2 = {
168 .attname = {"xmin"},
169 .atttypid = XIDOID,
170 .attlen = sizeof(TransactionId),
171 .attnum = MinTransactionIdAttributeNumber,
172 .attcacheoff = -1,
173 .atttypmod = -1,
174 .attbyval = true,
175 .attstorage = 'p',
176 .attalign = 'i',
177 .attnotnull = true,
178 .attislocal = true,
179 };
180
181 static const FormData_pg_attribute a3 = {
182 .attname = {"cmin"},
183 .atttypid = CIDOID,
184 .attlen = sizeof(CommandId),
185 .attnum = MinCommandIdAttributeNumber,
186 .attcacheoff = -1,
187 .atttypmod = -1,
188 .attbyval = true,
189 .attstorage = 'p',
190 .attalign = 'i',
191 .attnotnull = true,
192 .attislocal = true,
193 };
194
195 static const FormData_pg_attribute a4 = {
196 .attname = {"xmax"},
197 .atttypid = XIDOID,
198 .attlen = sizeof(TransactionId),
199 .attnum = MaxTransactionIdAttributeNumber,
200 .attcacheoff = -1,
201 .atttypmod = -1,
202 .attbyval = true,
203 .attstorage = 'p',
204 .attalign = 'i',
205 .attnotnull = true,
206 .attislocal = true,
207 };
208
209 static const FormData_pg_attribute a5 = {
210 .attname = {"cmax"},
211 .atttypid = CIDOID,
212 .attlen = sizeof(CommandId),
213 .attnum = MaxCommandIdAttributeNumber,
214 .attcacheoff = -1,
215 .atttypmod = -1,
216 .attbyval = true,
217 .attstorage = 'p',
218 .attalign = 'i',
219 .attnotnull = true,
220 .attislocal = true,
221 };
222
223 /*
224 * We decided to call this attribute "tableoid" rather than say
225 * "classoid" on the basis that in the future there may be more than one
226 * table of a particular class/type. In any case table is still the word
227 * used in SQL.
228 */
229 static const FormData_pg_attribute a6 = {
230 .attname = {"tableoid"},
231 .atttypid = OIDOID,
232 .attlen = sizeof(Oid),
233 .attnum = TableOidAttributeNumber,
234 .attcacheoff = -1,
235 .atttypmod = -1,
236 .attbyval = true,
237 .attstorage = 'p',
238 .attalign = 'i',
239 .attnotnull = true,
240 .attislocal = true,
241 };
242
243 static const FormData_pg_attribute *SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6};
244
245 /*
246 * This function returns a Form_pg_attribute pointer for a system attribute.
247 * Note that we elog if the presented attno is invalid, which would only
248 * happen if there's a problem upstream.
249 */
250 const FormData_pg_attribute *
SystemAttributeDefinition(AttrNumber attno)251 SystemAttributeDefinition(AttrNumber attno)
252 {
253 if (attno >= 0 || attno < -(int) lengthof(SysAtt))
254 elog(ERROR, "invalid system attribute number %d", attno);
255 return SysAtt[-attno - 1];
256 }
257
258 /*
259 * If the given name is a system attribute name, return a Form_pg_attribute
260 * pointer for a prototype definition. If not, return NULL.
261 */
262 const FormData_pg_attribute *
SystemAttributeByName(const char * attname)263 SystemAttributeByName(const char *attname)
264 {
265 int j;
266
267 for (j = 0; j < (int) lengthof(SysAtt); j++)
268 {
269 const FormData_pg_attribute *att = SysAtt[j];
270
271 if (strcmp(NameStr(att->attname), attname) == 0)
272 return att;
273 }
274
275 return NULL;
276 }
277
278
279 /* ----------------------------------------------------------------
280 * XXX END OF UGLY HARD CODED BADNESS XXX
281 * ---------------------------------------------------------------- */
282
283
284 /* ----------------------------------------------------------------
285 * heap_create - Create an uncataloged heap relation
286 *
287 * Note API change: the caller must now always provide the OID
288 * to use for the relation. The relfilenode may (and, normally,
289 * should) be left unspecified.
290 *
291 * rel->rd_rel is initialized by RelationBuildLocalRelation,
292 * and is mostly zeroes at return.
293 * ----------------------------------------------------------------
294 */
295 Relation
heap_create(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid relfilenode,Oid accessmtd,TupleDesc tupDesc,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,bool allow_system_table_mods,TransactionId * relfrozenxid,MultiXactId * relminmxid)296 heap_create(const char *relname,
297 Oid relnamespace,
298 Oid reltablespace,
299 Oid relid,
300 Oid relfilenode,
301 Oid accessmtd,
302 TupleDesc tupDesc,
303 char relkind,
304 char relpersistence,
305 bool shared_relation,
306 bool mapped_relation,
307 bool allow_system_table_mods,
308 TransactionId *relfrozenxid,
309 MultiXactId *relminmxid)
310 {
311 bool create_storage;
312 Relation rel;
313
314 /* The caller must have provided an OID for the relation. */
315 Assert(OidIsValid(relid));
316
317 /*
318 * Don't allow creating relations in pg_catalog directly, even though it
319 * is allowed to move user defined relations there. Semantics with search
320 * paths including pg_catalog are too confusing for now.
321 *
322 * But allow creating indexes on relations in pg_catalog even if
323 * allow_system_table_mods = off, upper layers already guarantee it's on a
324 * user defined relation, not a system one.
325 */
326 if (!allow_system_table_mods &&
327 ((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
328 IsToastNamespace(relnamespace)) &&
329 IsNormalProcessingMode())
330 ereport(ERROR,
331 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
332 errmsg("permission denied to create \"%s.%s\"",
333 get_namespace_name(relnamespace), relname),
334 errdetail("System catalog modifications are currently disallowed.")));
335
336 *relfrozenxid = InvalidTransactionId;
337 *relminmxid = InvalidMultiXactId;
338
339 /* Handle reltablespace for specific relkinds. */
340 switch (relkind)
341 {
342 case RELKIND_VIEW:
343 case RELKIND_COMPOSITE_TYPE:
344 case RELKIND_FOREIGN_TABLE:
345
346 /*
347 * Force reltablespace to zero if the relation has no physical
348 * storage. This is mainly just for cleanliness' sake.
349 *
350 * Partitioned tables and indexes don't have physical storage
351 * either, but we want to keep their tablespace settings so that
352 * their children can inherit it.
353 */
354 reltablespace = InvalidOid;
355 break;
356
357 case RELKIND_SEQUENCE:
358
359 /*
360 * Force reltablespace to zero for sequences, since we don't
361 * support moving them around into different tablespaces.
362 */
363 reltablespace = InvalidOid;
364 break;
365 default:
366 break;
367 }
368
369 /*
370 * Decide whether to create storage. If caller passed a valid relfilenode,
371 * storage is already created, so don't do it here. Also don't create it
372 * for relkinds without physical storage.
373 */
374 if (!RELKIND_HAS_STORAGE(relkind) || OidIsValid(relfilenode))
375 create_storage = false;
376 else
377 {
378 create_storage = true;
379 relfilenode = relid;
380 }
381
382 /*
383 * Never allow a pg_class entry to explicitly specify the database's
384 * default tablespace in reltablespace; force it to zero instead. This
385 * ensures that if the database is cloned with a different default
386 * tablespace, the pg_class entry will still match where CREATE DATABASE
387 * will put the physically copied relation.
388 *
389 * Yes, this is a bit of a hack.
390 */
391 if (reltablespace == MyDatabaseTableSpace)
392 reltablespace = InvalidOid;
393
394 /*
395 * build the relcache entry.
396 */
397 rel = RelationBuildLocalRelation(relname,
398 relnamespace,
399 tupDesc,
400 relid,
401 accessmtd,
402 relfilenode,
403 reltablespace,
404 shared_relation,
405 mapped_relation,
406 relpersistence,
407 relkind);
408
409 /*
410 * Have the storage manager create the relation's disk file, if needed.
411 *
412 * For relations the callback creates both the main and the init fork, for
413 * indexes only the main fork is created. The other forks will be created
414 * on demand.
415 */
416 if (create_storage)
417 {
418 RelationOpenSmgr(rel);
419
420 switch (rel->rd_rel->relkind)
421 {
422 case RELKIND_VIEW:
423 case RELKIND_COMPOSITE_TYPE:
424 case RELKIND_FOREIGN_TABLE:
425 case RELKIND_PARTITIONED_TABLE:
426 case RELKIND_PARTITIONED_INDEX:
427 Assert(false);
428 break;
429
430 case RELKIND_INDEX:
431 case RELKIND_SEQUENCE:
432 RelationCreateStorage(rel->rd_node, relpersistence);
433 break;
434
435 case RELKIND_RELATION:
436 case RELKIND_TOASTVALUE:
437 case RELKIND_MATVIEW:
438 table_relation_set_new_filenode(rel, &rel->rd_node,
439 relpersistence,
440 relfrozenxid, relminmxid);
441 break;
442 }
443 }
444
445 /*
446 * If a tablespace is specified, removal of that tablespace is normally
447 * protected by the existence of a physical file; but for relations with
448 * no files, add a pg_shdepend entry to account for that.
449 */
450 if (!create_storage && reltablespace != InvalidOid)
451 recordDependencyOnTablespace(RelationRelationId, relid,
452 reltablespace);
453
454 return rel;
455 }
456
457 /* ----------------------------------------------------------------
458 * heap_create_with_catalog - Create a cataloged relation
459 *
460 * this is done in multiple steps:
461 *
462 * 1) CheckAttributeNamesTypes() is used to make certain the tuple
463 * descriptor contains a valid set of attribute names and types
464 *
465 * 2) pg_class is opened and get_relname_relid()
466 * performs a scan to ensure that no relation with the
467 * same name already exists.
468 *
469 * 3) heap_create() is called to create the new relation on disk.
470 *
471 * 4) TypeCreate() is called to define a new type corresponding
472 * to the new relation.
473 *
474 * 5) AddNewRelationTuple() is called to register the
475 * relation in pg_class.
476 *
477 * 6) AddNewAttributeTuples() is called to register the
478 * new relation's schema in pg_attribute.
479 *
480 * 7) StoreConstraints is called () - vadim 08/22/97
481 *
482 * 8) the relations are closed and the new relation's oid
483 * is returned.
484 *
485 * ----------------------------------------------------------------
486 */
487
488 /* --------------------------------
489 * CheckAttributeNamesTypes
490 *
491 * this is used to make certain the tuple descriptor contains a
492 * valid set of attribute names and datatypes. a problem simply
493 * generates ereport(ERROR) which aborts the current transaction.
494 *
495 * relkind is the relkind of the relation to be created.
496 * flags controls which datatypes are allowed, cf CheckAttributeType.
497 * --------------------------------
498 */
499 void
CheckAttributeNamesTypes(TupleDesc tupdesc,char relkind,int flags)500 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
501 int flags)
502 {
503 int i;
504 int j;
505 int natts = tupdesc->natts;
506
507 /* Sanity check on column count */
508 if (natts < 0 || natts > MaxHeapAttributeNumber)
509 ereport(ERROR,
510 (errcode(ERRCODE_TOO_MANY_COLUMNS),
511 errmsg("tables can have at most %d columns",
512 MaxHeapAttributeNumber)));
513
514 /*
515 * first check for collision with system attribute names
516 *
517 * Skip this for a view or type relation, since those don't have system
518 * attributes.
519 */
520 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
521 {
522 for (i = 0; i < natts; i++)
523 {
524 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
525
526 if (SystemAttributeByName(NameStr(attr->attname)) != NULL)
527 ereport(ERROR,
528 (errcode(ERRCODE_DUPLICATE_COLUMN),
529 errmsg("column name \"%s\" conflicts with a system column name",
530 NameStr(attr->attname))));
531 }
532 }
533
534 /*
535 * next check for repeated attribute names
536 */
537 for (i = 1; i < natts; i++)
538 {
539 for (j = 0; j < i; j++)
540 {
541 if (strcmp(NameStr(TupleDescAttr(tupdesc, j)->attname),
542 NameStr(TupleDescAttr(tupdesc, i)->attname)) == 0)
543 ereport(ERROR,
544 (errcode(ERRCODE_DUPLICATE_COLUMN),
545 errmsg("column name \"%s\" specified more than once",
546 NameStr(TupleDescAttr(tupdesc, j)->attname))));
547 }
548 }
549
550 /*
551 * next check the attribute types
552 */
553 for (i = 0; i < natts; i++)
554 {
555 CheckAttributeType(NameStr(TupleDescAttr(tupdesc, i)->attname),
556 TupleDescAttr(tupdesc, i)->atttypid,
557 TupleDescAttr(tupdesc, i)->attcollation,
558 NIL, /* assume we're creating a new rowtype */
559 flags);
560 }
561 }
562
563 /* --------------------------------
564 * CheckAttributeType
565 *
566 * Verify that the proposed datatype of an attribute is legal.
567 * This is needed mainly because there are types (and pseudo-types)
568 * in the catalogs that we do not support as elements of real tuples.
569 * We also check some other properties required of a table column.
570 *
571 * If the attribute is being proposed for addition to an existing table or
572 * composite type, pass a one-element list of the rowtype OID as
573 * containing_rowtypes. When checking a to-be-created rowtype, it's
574 * sufficient to pass NIL, because there could not be any recursive reference
575 * to a not-yet-existing rowtype.
576 *
577 * flags is a bitmask controlling which datatypes we allow. For the most
578 * part, pseudo-types are disallowed as attribute types, but there are some
579 * exceptions: ANYARRAYOID, RECORDOID, and RECORDARRAYOID can be allowed
580 * in some cases. (This works because values of those type classes are
581 * self-identifying to some extent. However, RECORDOID and RECORDARRAYOID
582 * are reliably identifiable only within a session, since the identity info
583 * may use a typmod that is only locally assigned. The caller is expected
584 * to know whether these cases are safe.)
585 *
586 * flags can also control the phrasing of the error messages. If
587 * CHKATYPE_IS_PARTKEY is specified, "attname" should be a partition key
588 * column number as text, not a real column name.
589 * --------------------------------
590 */
591 void
CheckAttributeType(const char * attname,Oid atttypid,Oid attcollation,List * containing_rowtypes,int flags)592 CheckAttributeType(const char *attname,
593 Oid atttypid, Oid attcollation,
594 List *containing_rowtypes,
595 int flags)
596 {
597 char att_typtype = get_typtype(atttypid);
598 Oid att_typelem;
599
600 if (att_typtype == TYPTYPE_PSEUDO)
601 {
602 /*
603 * We disallow pseudo-type columns, with the exception of ANYARRAY,
604 * RECORD, and RECORD[] when the caller says that those are OK.
605 *
606 * We don't need to worry about recursive containment for RECORD and
607 * RECORD[] because (a) no named composite type should be allowed to
608 * contain those, and (b) two "anonymous" record types couldn't be
609 * considered to be the same type, so infinite recursion isn't
610 * possible.
611 */
612 if (!((atttypid == ANYARRAYOID && (flags & CHKATYPE_ANYARRAY)) ||
613 (atttypid == RECORDOID && (flags & CHKATYPE_ANYRECORD)) ||
614 (atttypid == RECORDARRAYOID && (flags & CHKATYPE_ANYRECORD))))
615 {
616 if (flags & CHKATYPE_IS_PARTKEY)
617 ereport(ERROR,
618 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
619 /* translator: first %s is an integer not a name */
620 errmsg("partition key column %s has pseudo-type %s",
621 attname, format_type_be(atttypid))));
622 else
623 ereport(ERROR,
624 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
625 errmsg("column \"%s\" has pseudo-type %s",
626 attname, format_type_be(atttypid))));
627 }
628 }
629 else if (att_typtype == TYPTYPE_DOMAIN)
630 {
631 /*
632 * If it's a domain, recurse to check its base type.
633 */
634 CheckAttributeType(attname, getBaseType(atttypid), attcollation,
635 containing_rowtypes,
636 flags);
637 }
638 else if (att_typtype == TYPTYPE_COMPOSITE)
639 {
640 /*
641 * For a composite type, recurse into its attributes.
642 */
643 Relation relation;
644 TupleDesc tupdesc;
645 int i;
646
647 /*
648 * Check for self-containment. Eventually we might be able to allow
649 * this (just return without complaint, if so) but it's not clear how
650 * many other places would require anti-recursion defenses before it
651 * would be safe to allow tables to contain their own rowtype.
652 */
653 if (list_member_oid(containing_rowtypes, atttypid))
654 ereport(ERROR,
655 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
656 errmsg("composite type %s cannot be made a member of itself",
657 format_type_be(atttypid))));
658
659 containing_rowtypes = lcons_oid(atttypid, containing_rowtypes);
660
661 relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
662
663 tupdesc = RelationGetDescr(relation);
664
665 for (i = 0; i < tupdesc->natts; i++)
666 {
667 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
668
669 if (attr->attisdropped)
670 continue;
671 CheckAttributeType(NameStr(attr->attname),
672 attr->atttypid, attr->attcollation,
673 containing_rowtypes,
674 flags & ~CHKATYPE_IS_PARTKEY);
675 }
676
677 relation_close(relation, AccessShareLock);
678
679 containing_rowtypes = list_delete_first(containing_rowtypes);
680 }
681 else if (att_typtype == TYPTYPE_RANGE)
682 {
683 /*
684 * If it's a range, recurse to check its subtype.
685 */
686 CheckAttributeType(attname, get_range_subtype(atttypid),
687 get_range_collation(atttypid),
688 containing_rowtypes,
689 flags);
690 }
691 else if (OidIsValid((att_typelem = get_element_type(atttypid))))
692 {
693 /*
694 * Must recurse into array types, too, in case they are composite.
695 */
696 CheckAttributeType(attname, att_typelem, attcollation,
697 containing_rowtypes,
698 flags);
699 }
700
701 /*
702 * This might not be strictly invalid per SQL standard, but it is pretty
703 * useless, and it cannot be dumped, so we must disallow it.
704 */
705 if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
706 {
707 if (flags & CHKATYPE_IS_PARTKEY)
708 ereport(ERROR,
709 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
710 /* translator: first %s is an integer not a name */
711 errmsg("no collation was derived for partition key column %s with collatable type %s",
712 attname, format_type_be(atttypid)),
713 errhint("Use the COLLATE clause to set the collation explicitly.")));
714 else
715 ereport(ERROR,
716 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
717 errmsg("no collation was derived for column \"%s\" with collatable type %s",
718 attname, format_type_be(atttypid)),
719 errhint("Use the COLLATE clause to set the collation explicitly.")));
720 }
721 }
722
723 /*
724 * InsertPgAttributeTuple
725 * Construct and insert a new tuple in pg_attribute.
726 *
727 * Caller has already opened and locked pg_attribute. new_attribute is the
728 * attribute to insert. attcacheoff is always initialized to -1, attacl and
729 * attoptions are always initialized to NULL.
730 *
731 * indstate is the index state for CatalogTupleInsertWithInfo. It can be
732 * passed as NULL, in which case we'll fetch the necessary info. (Don't do
733 * this when inserting multiple attributes, because it's a tad more
734 * expensive.)
735 */
736 void
InsertPgAttributeTuple(Relation pg_attribute_rel,Form_pg_attribute new_attribute,CatalogIndexState indstate)737 InsertPgAttributeTuple(Relation pg_attribute_rel,
738 Form_pg_attribute new_attribute,
739 CatalogIndexState indstate)
740 {
741 Datum values[Natts_pg_attribute];
742 bool nulls[Natts_pg_attribute];
743 HeapTuple tup;
744
745 /* This is a tad tedious, but way cleaner than what we used to do... */
746 memset(values, 0, sizeof(values));
747 memset(nulls, false, sizeof(nulls));
748
749 values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
750 values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
751 values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
752 values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
753 values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
754 values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
755 values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
756 values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(-1);
757 values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
758 values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
759 values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
760 values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
761 values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
762 values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
763 values[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(new_attribute->atthasmissing);
764 values[Anum_pg_attribute_attidentity - 1] = CharGetDatum(new_attribute->attidentity);
765 values[Anum_pg_attribute_attgenerated - 1] = CharGetDatum(new_attribute->attgenerated);
766 values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
767 values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
768 values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
769 values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
770
771 /* start out with empty permissions and empty options */
772 nulls[Anum_pg_attribute_attacl - 1] = true;
773 nulls[Anum_pg_attribute_attoptions - 1] = true;
774 nulls[Anum_pg_attribute_attfdwoptions - 1] = true;
775 nulls[Anum_pg_attribute_attmissingval - 1] = true;
776
777 tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
778
779 /* finally insert the new tuple, update the indexes, and clean up */
780 if (indstate != NULL)
781 CatalogTupleInsertWithInfo(pg_attribute_rel, tup, indstate);
782 else
783 CatalogTupleInsert(pg_attribute_rel, tup);
784
785 heap_freetuple(tup);
786 }
787
788 /* --------------------------------
789 * AddNewAttributeTuples
790 *
791 * this registers the new relation's schema by adding
792 * tuples to pg_attribute.
793 * --------------------------------
794 */
795 static void
AddNewAttributeTuples(Oid new_rel_oid,TupleDesc tupdesc,char relkind)796 AddNewAttributeTuples(Oid new_rel_oid,
797 TupleDesc tupdesc,
798 char relkind)
799 {
800 Form_pg_attribute attr;
801 int i;
802 Relation rel;
803 CatalogIndexState indstate;
804 int natts = tupdesc->natts;
805 ObjectAddress myself,
806 referenced;
807
808 /*
809 * open pg_attribute and its indexes.
810 */
811 rel = table_open(AttributeRelationId, RowExclusiveLock);
812
813 indstate = CatalogOpenIndexes(rel);
814
815 /*
816 * First we add the user attributes. This is also a convenient place to
817 * add dependencies on their datatypes and collations.
818 */
819 for (i = 0; i < natts; i++)
820 {
821 attr = TupleDescAttr(tupdesc, i);
822 /* Fill in the correct relation OID */
823 attr->attrelid = new_rel_oid;
824 /* Make sure this is OK, too */
825 attr->attstattarget = -1;
826
827 InsertPgAttributeTuple(rel, attr, indstate);
828
829 /* Add dependency info */
830 myself.classId = RelationRelationId;
831 myself.objectId = new_rel_oid;
832 myself.objectSubId = i + 1;
833 referenced.classId = TypeRelationId;
834 referenced.objectId = attr->atttypid;
835 referenced.objectSubId = 0;
836 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
837
838 /* The default collation is pinned, so don't bother recording it */
839 if (OidIsValid(attr->attcollation) &&
840 attr->attcollation != DEFAULT_COLLATION_OID)
841 {
842 referenced.classId = CollationRelationId;
843 referenced.objectId = attr->attcollation;
844 referenced.objectSubId = 0;
845 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
846 }
847 }
848
849 /*
850 * Next we add the system attributes. Skip OID if rel has no OIDs. Skip
851 * all for a view or type relation. We don't bother with making datatype
852 * dependencies here, since presumably all these types are pinned.
853 */
854 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
855 {
856 for (i = 0; i < (int) lengthof(SysAtt); i++)
857 {
858 FormData_pg_attribute attStruct;
859
860 memcpy(&attStruct, SysAtt[i], sizeof(FormData_pg_attribute));
861
862 /* Fill in the correct relation OID in the copied tuple */
863 attStruct.attrelid = new_rel_oid;
864
865 InsertPgAttributeTuple(rel, &attStruct, indstate);
866 }
867 }
868
869 /*
870 * clean up
871 */
872 CatalogCloseIndexes(indstate);
873
874 table_close(rel, RowExclusiveLock);
875 }
876
877 /* --------------------------------
878 * InsertPgClassTuple
879 *
880 * Construct and insert a new tuple in pg_class.
881 *
882 * Caller has already opened and locked pg_class.
883 * Tuple data is taken from new_rel_desc->rd_rel, except for the
884 * variable-width fields which are not present in a cached reldesc.
885 * relacl and reloptions are passed in Datum form (to avoid having
886 * to reference the data types in heap.h). Pass (Datum) 0 to set them
887 * to NULL.
888 * --------------------------------
889 */
890 void
InsertPgClassTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Datum relacl,Datum reloptions)891 InsertPgClassTuple(Relation pg_class_desc,
892 Relation new_rel_desc,
893 Oid new_rel_oid,
894 Datum relacl,
895 Datum reloptions)
896 {
897 Form_pg_class rd_rel = new_rel_desc->rd_rel;
898 Datum values[Natts_pg_class];
899 bool nulls[Natts_pg_class];
900 HeapTuple tup;
901
902 /* This is a tad tedious, but way cleaner than what we used to do... */
903 memset(values, 0, sizeof(values));
904 memset(nulls, false, sizeof(nulls));
905
906 values[Anum_pg_class_oid - 1] = ObjectIdGetDatum(new_rel_oid);
907 values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
908 values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
909 values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
910 values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
911 values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
912 values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
913 values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
914 values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
915 values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
916 values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
917 values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
918 values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
919 values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
920 values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
921 values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
922 values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
923 values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
924 values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
925 values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
926 values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
927 values[Anum_pg_class_relrowsecurity - 1] = BoolGetDatum(rd_rel->relrowsecurity);
928 values[Anum_pg_class_relforcerowsecurity - 1] = BoolGetDatum(rd_rel->relforcerowsecurity);
929 values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
930 values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
931 values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
932 values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
933 values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
934 values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
935 values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
936 if (relacl != (Datum) 0)
937 values[Anum_pg_class_relacl - 1] = relacl;
938 else
939 nulls[Anum_pg_class_relacl - 1] = true;
940 if (reloptions != (Datum) 0)
941 values[Anum_pg_class_reloptions - 1] = reloptions;
942 else
943 nulls[Anum_pg_class_reloptions - 1] = true;
944
945 /* relpartbound is set by updating this tuple, if necessary */
946 nulls[Anum_pg_class_relpartbound - 1] = true;
947
948 tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
949
950 /* finally insert the new tuple, update the indexes, and clean up */
951 CatalogTupleInsert(pg_class_desc, tup);
952
953 heap_freetuple(tup);
954 }
955
956 /* --------------------------------
957 * AddNewRelationTuple
958 *
959 * this registers the new relation in the catalogs by
960 * adding a tuple to pg_class.
961 * --------------------------------
962 */
963 static void
AddNewRelationTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Oid new_type_oid,Oid reloftype,Oid relowner,char relkind,TransactionId relfrozenxid,TransactionId relminmxid,Datum relacl,Datum reloptions)964 AddNewRelationTuple(Relation pg_class_desc,
965 Relation new_rel_desc,
966 Oid new_rel_oid,
967 Oid new_type_oid,
968 Oid reloftype,
969 Oid relowner,
970 char relkind,
971 TransactionId relfrozenxid,
972 TransactionId relminmxid,
973 Datum relacl,
974 Datum reloptions)
975 {
976 Form_pg_class new_rel_reltup;
977
978 /*
979 * first we update some of the information in our uncataloged relation's
980 * relation descriptor.
981 */
982 new_rel_reltup = new_rel_desc->rd_rel;
983
984 switch (relkind)
985 {
986 case RELKIND_RELATION:
987 case RELKIND_MATVIEW:
988 case RELKIND_INDEX:
989 case RELKIND_TOASTVALUE:
990 /* The relation is real, but as yet empty */
991 new_rel_reltup->relpages = 0;
992 new_rel_reltup->reltuples = 0;
993 new_rel_reltup->relallvisible = 0;
994 break;
995 case RELKIND_SEQUENCE:
996 /* Sequences always have a known size */
997 new_rel_reltup->relpages = 1;
998 new_rel_reltup->reltuples = 1;
999 new_rel_reltup->relallvisible = 0;
1000 break;
1001 default:
1002 /* Views, etc, have no disk storage */
1003 new_rel_reltup->relpages = 0;
1004 new_rel_reltup->reltuples = 0;
1005 new_rel_reltup->relallvisible = 0;
1006 break;
1007 }
1008
1009 new_rel_reltup->relfrozenxid = relfrozenxid;
1010 new_rel_reltup->relminmxid = relminmxid;
1011 new_rel_reltup->relowner = relowner;
1012 new_rel_reltup->reltype = new_type_oid;
1013 new_rel_reltup->reloftype = reloftype;
1014
1015 /* relispartition is always set by updating this tuple later */
1016 new_rel_reltup->relispartition = false;
1017
1018 new_rel_desc->rd_att->tdtypeid = new_type_oid;
1019
1020 /* Now build and insert the tuple */
1021 InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
1022 relacl, reloptions);
1023 }
1024
1025
1026 /* --------------------------------
1027 * AddNewRelationType -
1028 *
1029 * define a composite type corresponding to the new relation
1030 * --------------------------------
1031 */
1032 static ObjectAddress
AddNewRelationType(const char * typeName,Oid typeNamespace,Oid new_rel_oid,char new_rel_kind,Oid ownerid,Oid new_row_type,Oid new_array_type)1033 AddNewRelationType(const char *typeName,
1034 Oid typeNamespace,
1035 Oid new_rel_oid,
1036 char new_rel_kind,
1037 Oid ownerid,
1038 Oid new_row_type,
1039 Oid new_array_type)
1040 {
1041 return
1042 TypeCreate(new_row_type, /* optional predetermined OID */
1043 typeName, /* type name */
1044 typeNamespace, /* type namespace */
1045 new_rel_oid, /* relation oid */
1046 new_rel_kind, /* relation kind */
1047 ownerid, /* owner's ID */
1048 -1, /* internal size (varlena) */
1049 TYPTYPE_COMPOSITE, /* type-type (composite) */
1050 TYPCATEGORY_COMPOSITE, /* type-category (ditto) */
1051 false, /* composite types are never preferred */
1052 DEFAULT_TYPDELIM, /* default array delimiter */
1053 F_RECORD_IN, /* input procedure */
1054 F_RECORD_OUT, /* output procedure */
1055 F_RECORD_RECV, /* receive procedure */
1056 F_RECORD_SEND, /* send procedure */
1057 InvalidOid, /* typmodin procedure - none */
1058 InvalidOid, /* typmodout procedure - none */
1059 InvalidOid, /* analyze procedure - default */
1060 InvalidOid, /* array element type - irrelevant */
1061 false, /* this is not an array type */
1062 new_array_type, /* array type if any */
1063 InvalidOid, /* domain base type - irrelevant */
1064 NULL, /* default value - none */
1065 NULL, /* default binary representation */
1066 false, /* passed by reference */
1067 'd', /* alignment - must be the largest! */
1068 'x', /* fully TOASTable */
1069 -1, /* typmod */
1070 0, /* array dimensions for typBaseType */
1071 false, /* Type NOT NULL */
1072 InvalidOid); /* rowtypes never have a collation */
1073 }
1074
1075 /* --------------------------------
1076 * heap_create_with_catalog
1077 *
1078 * creates a new cataloged relation. see comments above.
1079 *
1080 * Arguments:
1081 * relname: name to give to new rel
1082 * relnamespace: OID of namespace it goes in
1083 * reltablespace: OID of tablespace it goes in
1084 * relid: OID to assign to new rel, or InvalidOid to select a new OID
1085 * reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
1086 * reloftypeid: if a typed table, OID of underlying type; else InvalidOid
1087 * ownerid: OID of new rel's owner
1088 * tupdesc: tuple descriptor (source of column definitions)
1089 * cooked_constraints: list of precooked check constraints and defaults
1090 * relkind: relkind for new rel
1091 * relpersistence: rel's persistence status (permanent, temp, or unlogged)
1092 * shared_relation: true if it's to be a shared relation
1093 * mapped_relation: true if the relation will use the relfilenode map
1094 * oncommit: ON COMMIT marking (only relevant if it's a temp table)
1095 * reloptions: reloptions in Datum form, or (Datum) 0 if none
1096 * use_user_acl: true if should look for user-defined default permissions;
1097 * if false, relacl is always set NULL
1098 * allow_system_table_mods: true to allow creation in system namespaces
1099 * is_internal: is this a system-generated catalog?
1100 *
1101 * Output parameters:
1102 * typaddress: if not null, gets the object address of the new pg_type entry
1103 *
1104 * Returns the OID of the new relation
1105 * --------------------------------
1106 */
1107 Oid
heap_create_with_catalog(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid reltypeid,Oid reloftypeid,Oid ownerid,Oid accessmtd,TupleDesc tupdesc,List * cooked_constraints,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,OnCommitAction oncommit,Datum reloptions,bool use_user_acl,bool allow_system_table_mods,bool is_internal,Oid relrewrite,ObjectAddress * typaddress)1108 heap_create_with_catalog(const char *relname,
1109 Oid relnamespace,
1110 Oid reltablespace,
1111 Oid relid,
1112 Oid reltypeid,
1113 Oid reloftypeid,
1114 Oid ownerid,
1115 Oid accessmtd,
1116 TupleDesc tupdesc,
1117 List *cooked_constraints,
1118 char relkind,
1119 char relpersistence,
1120 bool shared_relation,
1121 bool mapped_relation,
1122 OnCommitAction oncommit,
1123 Datum reloptions,
1124 bool use_user_acl,
1125 bool allow_system_table_mods,
1126 bool is_internal,
1127 Oid relrewrite,
1128 ObjectAddress *typaddress)
1129 {
1130 Relation pg_class_desc;
1131 Relation new_rel_desc;
1132 Acl *relacl;
1133 Oid existing_relid;
1134 Oid old_type_oid;
1135 Oid new_type_oid;
1136 ObjectAddress new_type_addr;
1137 Oid new_array_oid = InvalidOid;
1138 TransactionId relfrozenxid;
1139 MultiXactId relminmxid;
1140
1141 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1142
1143 /*
1144 * sanity checks
1145 */
1146 Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1147
1148 /*
1149 * Validate proposed tupdesc for the desired relkind. If
1150 * allow_system_table_mods is on, allow ANYARRAY to be used; this is a
1151 * hack to allow creating pg_statistic and cloning it during VACUUM FULL.
1152 */
1153 CheckAttributeNamesTypes(tupdesc, relkind,
1154 allow_system_table_mods ? CHKATYPE_ANYARRAY : 0);
1155
1156 /*
1157 * This would fail later on anyway, if the relation already exists. But
1158 * by catching it here we can emit a nicer error message.
1159 */
1160 existing_relid = get_relname_relid(relname, relnamespace);
1161 if (existing_relid != InvalidOid)
1162 ereport(ERROR,
1163 (errcode(ERRCODE_DUPLICATE_TABLE),
1164 errmsg("relation \"%s\" already exists", relname)));
1165
1166 /*
1167 * Since we are going to create a rowtype as well, also check for
1168 * collision with an existing type name. If there is one and it's an
1169 * autogenerated array, we can rename it out of the way; otherwise we can
1170 * at least give a good error message.
1171 */
1172 old_type_oid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
1173 CStringGetDatum(relname),
1174 ObjectIdGetDatum(relnamespace));
1175 if (OidIsValid(old_type_oid))
1176 {
1177 if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1178 ereport(ERROR,
1179 (errcode(ERRCODE_DUPLICATE_OBJECT),
1180 errmsg("type \"%s\" already exists", relname),
1181 errhint("A relation has an associated type of the same name, "
1182 "so you must use a name that doesn't conflict "
1183 "with any existing type.")));
1184 }
1185
1186 /*
1187 * Shared relations must be in pg_global (last-ditch check)
1188 */
1189 if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1190 elog(ERROR, "shared relations must be placed in pg_global tablespace");
1191
1192 /*
1193 * Allocate an OID for the relation, unless we were told what to use.
1194 *
1195 * The OID will be the relfilenode as well, so make sure it doesn't
1196 * collide with either pg_class OIDs or existing physical files.
1197 */
1198 if (!OidIsValid(relid))
1199 {
1200 /* Use binary-upgrade override for pg_class.oid/relfilenode? */
1201 if (IsBinaryUpgrade &&
1202 (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1203 relkind == RELKIND_VIEW || relkind == RELKIND_MATVIEW ||
1204 relkind == RELKIND_COMPOSITE_TYPE || relkind == RELKIND_FOREIGN_TABLE ||
1205 relkind == RELKIND_PARTITIONED_TABLE))
1206 {
1207 if (!OidIsValid(binary_upgrade_next_heap_pg_class_oid))
1208 ereport(ERROR,
1209 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1210 errmsg("pg_class heap OID value not set when in binary upgrade mode")));
1211
1212 relid = binary_upgrade_next_heap_pg_class_oid;
1213 binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1214 }
1215 /* There might be no TOAST table, so we have to test for it. */
1216 else if (IsBinaryUpgrade &&
1217 OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1218 relkind == RELKIND_TOASTVALUE)
1219 {
1220 relid = binary_upgrade_next_toast_pg_class_oid;
1221 binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1222 }
1223 else
1224 relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1225 relpersistence);
1226 }
1227
1228 /*
1229 * Determine the relation's initial permissions.
1230 */
1231 if (use_user_acl)
1232 {
1233 switch (relkind)
1234 {
1235 case RELKIND_RELATION:
1236 case RELKIND_VIEW:
1237 case RELKIND_MATVIEW:
1238 case RELKIND_FOREIGN_TABLE:
1239 case RELKIND_PARTITIONED_TABLE:
1240 relacl = get_user_default_acl(OBJECT_TABLE, ownerid,
1241 relnamespace);
1242 break;
1243 case RELKIND_SEQUENCE:
1244 relacl = get_user_default_acl(OBJECT_SEQUENCE, ownerid,
1245 relnamespace);
1246 break;
1247 default:
1248 relacl = NULL;
1249 break;
1250 }
1251 }
1252 else
1253 relacl = NULL;
1254
1255 /*
1256 * Create the relcache entry (mostly dummy at this point) and the physical
1257 * disk file. (If we fail further down, it's the smgr's responsibility to
1258 * remove the disk file again.)
1259 */
1260 new_rel_desc = heap_create(relname,
1261 relnamespace,
1262 reltablespace,
1263 relid,
1264 InvalidOid,
1265 accessmtd,
1266 tupdesc,
1267 relkind,
1268 relpersistence,
1269 shared_relation,
1270 mapped_relation,
1271 allow_system_table_mods,
1272 &relfrozenxid,
1273 &relminmxid);
1274
1275 Assert(relid == RelationGetRelid(new_rel_desc));
1276
1277 new_rel_desc->rd_rel->relrewrite = relrewrite;
1278
1279 /*
1280 * Decide whether to create an array type over the relation's rowtype. We
1281 * do not create any array types for system catalogs (ie, those made
1282 * during initdb). We do not create them where the use of a relation as
1283 * such is an implementation detail: toast tables, sequences and indexes.
1284 */
1285 if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
1286 relkind == RELKIND_VIEW ||
1287 relkind == RELKIND_MATVIEW ||
1288 relkind == RELKIND_FOREIGN_TABLE ||
1289 relkind == RELKIND_COMPOSITE_TYPE ||
1290 relkind == RELKIND_PARTITIONED_TABLE))
1291 new_array_oid = AssignTypeArrayOid();
1292
1293 /*
1294 * Since defining a relation also defines a complex type, we add a new
1295 * system type corresponding to the new relation. The OID of the type can
1296 * be preselected by the caller, but if reltypeid is InvalidOid, we'll
1297 * generate a new OID for it.
1298 *
1299 * NOTE: we could get a unique-index failure here, in case someone else is
1300 * creating the same type name in parallel but hadn't committed yet when
1301 * we checked for a duplicate name above.
1302 */
1303 new_type_addr = AddNewRelationType(relname,
1304 relnamespace,
1305 relid,
1306 relkind,
1307 ownerid,
1308 reltypeid,
1309 new_array_oid);
1310 new_type_oid = new_type_addr.objectId;
1311 if (typaddress)
1312 *typaddress = new_type_addr;
1313
1314 /*
1315 * Now make the array type if wanted.
1316 */
1317 if (OidIsValid(new_array_oid))
1318 {
1319 char *relarrayname;
1320
1321 relarrayname = makeArrayTypeName(relname, relnamespace);
1322
1323 TypeCreate(new_array_oid, /* force the type's OID to this */
1324 relarrayname, /* Array type name */
1325 relnamespace, /* Same namespace as parent */
1326 InvalidOid, /* Not composite, no relationOid */
1327 0, /* relkind, also N/A here */
1328 ownerid, /* owner's ID */
1329 -1, /* Internal size (varlena) */
1330 TYPTYPE_BASE, /* Not composite - typelem is */
1331 TYPCATEGORY_ARRAY, /* type-category (array) */
1332 false, /* array types are never preferred */
1333 DEFAULT_TYPDELIM, /* default array delimiter */
1334 F_ARRAY_IN, /* array input proc */
1335 F_ARRAY_OUT, /* array output proc */
1336 F_ARRAY_RECV, /* array recv (bin) proc */
1337 F_ARRAY_SEND, /* array send (bin) proc */
1338 InvalidOid, /* typmodin procedure - none */
1339 InvalidOid, /* typmodout procedure - none */
1340 F_ARRAY_TYPANALYZE, /* array analyze procedure */
1341 new_type_oid, /* array element type - the rowtype */
1342 true, /* yes, this is an array type */
1343 InvalidOid, /* this has no array type */
1344 InvalidOid, /* domain base type - irrelevant */
1345 NULL, /* default value - none */
1346 NULL, /* default binary representation */
1347 false, /* passed by reference */
1348 'd', /* alignment - must be the largest! */
1349 'x', /* fully TOASTable */
1350 -1, /* typmod */
1351 0, /* array dimensions for typBaseType */
1352 false, /* Type NOT NULL */
1353 InvalidOid); /* rowtypes never have a collation */
1354
1355 pfree(relarrayname);
1356 }
1357
1358 /*
1359 * now create an entry in pg_class for the relation.
1360 *
1361 * NOTE: we could get a unique-index failure here, in case someone else is
1362 * creating the same relation name in parallel but hadn't committed yet
1363 * when we checked for a duplicate name above.
1364 */
1365 AddNewRelationTuple(pg_class_desc,
1366 new_rel_desc,
1367 relid,
1368 new_type_oid,
1369 reloftypeid,
1370 ownerid,
1371 relkind,
1372 relfrozenxid,
1373 relminmxid,
1374 PointerGetDatum(relacl),
1375 reloptions);
1376
1377 /*
1378 * now add tuples to pg_attribute for the attributes in our new relation.
1379 */
1380 AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind);
1381
1382 /*
1383 * Make a dependency link to force the relation to be deleted if its
1384 * namespace is. Also make a dependency link to its owner, as well as
1385 * dependencies for any roles mentioned in the default ACL.
1386 *
1387 * For composite types, these dependencies are tracked for the pg_type
1388 * entry, so we needn't record them here. Likewise, TOAST tables don't
1389 * need a namespace dependency (they live in a pinned namespace) nor an
1390 * owner dependency (they depend indirectly through the parent table), nor
1391 * should they have any ACL entries. The same applies for extension
1392 * dependencies.
1393 *
1394 * Also, skip this in bootstrap mode, since we don't make dependencies
1395 * while bootstrapping.
1396 */
1397 if (relkind != RELKIND_COMPOSITE_TYPE &&
1398 relkind != RELKIND_TOASTVALUE &&
1399 !IsBootstrapProcessingMode())
1400 {
1401 ObjectAddress myself,
1402 referenced;
1403
1404 myself.classId = RelationRelationId;
1405 myself.objectId = relid;
1406 myself.objectSubId = 0;
1407
1408 referenced.classId = NamespaceRelationId;
1409 referenced.objectId = relnamespace;
1410 referenced.objectSubId = 0;
1411 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1412
1413 recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1414
1415 recordDependencyOnNewAcl(RelationRelationId, relid, 0, ownerid, relacl);
1416
1417 recordDependencyOnCurrentExtension(&myself, false);
1418
1419 if (reloftypeid)
1420 {
1421 referenced.classId = TypeRelationId;
1422 referenced.objectId = reloftypeid;
1423 referenced.objectSubId = 0;
1424 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1425 }
1426
1427 /*
1428 * Make a dependency link to force the relation to be deleted if its
1429 * access method is. Do this only for relation and materialized views.
1430 *
1431 * No need to add an explicit dependency for the toast table, as the
1432 * main table depends on it.
1433 */
1434 if (relkind == RELKIND_RELATION ||
1435 relkind == RELKIND_MATVIEW)
1436 {
1437 referenced.classId = AccessMethodRelationId;
1438 referenced.objectId = accessmtd;
1439 referenced.objectSubId = 0;
1440 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1441 }
1442 }
1443
1444 /* Post creation hook for new relation */
1445 InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
1446
1447 /*
1448 * Store any supplied constraints and defaults.
1449 *
1450 * NB: this may do a CommandCounterIncrement and rebuild the relcache
1451 * entry, so the relation must be valid and self-consistent at this point.
1452 * In particular, there are not yet constraints and defaults anywhere.
1453 */
1454 StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
1455
1456 /*
1457 * If there's a special on-commit action, remember it
1458 */
1459 if (oncommit != ONCOMMIT_NOOP)
1460 register_on_commit_action(relid, oncommit);
1461
1462 /*
1463 * ok, the relation has been cataloged, so close our relations and return
1464 * the OID of the newly created relation.
1465 */
1466 table_close(new_rel_desc, NoLock); /* do not unlock till end of xact */
1467 table_close(pg_class_desc, RowExclusiveLock);
1468
1469 return relid;
1470 }
1471
1472 /*
1473 * RelationRemoveInheritance
1474 *
1475 * Formerly, this routine checked for child relations and aborted the
1476 * deletion if any were found. Now we rely on the dependency mechanism
1477 * to check for or delete child relations. By the time we get here,
1478 * there are no children and we need only remove any pg_inherits rows
1479 * linking this relation to its parent(s).
1480 */
1481 static void
RelationRemoveInheritance(Oid relid)1482 RelationRemoveInheritance(Oid relid)
1483 {
1484 Relation catalogRelation;
1485 SysScanDesc scan;
1486 ScanKeyData key;
1487 HeapTuple tuple;
1488
1489 catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
1490
1491 ScanKeyInit(&key,
1492 Anum_pg_inherits_inhrelid,
1493 BTEqualStrategyNumber, F_OIDEQ,
1494 ObjectIdGetDatum(relid));
1495
1496 scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1497 NULL, 1, &key);
1498
1499 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1500 CatalogTupleDelete(catalogRelation, &tuple->t_self);
1501
1502 systable_endscan(scan);
1503 table_close(catalogRelation, RowExclusiveLock);
1504 }
1505
1506 /*
1507 * DeleteRelationTuple
1508 *
1509 * Remove pg_class row for the given relid.
1510 *
1511 * Note: this is shared by relation deletion and index deletion. It's
1512 * not intended for use anyplace else.
1513 */
1514 void
DeleteRelationTuple(Oid relid)1515 DeleteRelationTuple(Oid relid)
1516 {
1517 Relation pg_class_desc;
1518 HeapTuple tup;
1519
1520 /* Grab an appropriate lock on the pg_class relation */
1521 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1522
1523 tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1524 if (!HeapTupleIsValid(tup))
1525 elog(ERROR, "cache lookup failed for relation %u", relid);
1526
1527 /* delete the relation tuple from pg_class, and finish up */
1528 CatalogTupleDelete(pg_class_desc, &tup->t_self);
1529
1530 ReleaseSysCache(tup);
1531
1532 table_close(pg_class_desc, RowExclusiveLock);
1533 }
1534
1535 /*
1536 * DeleteAttributeTuples
1537 *
1538 * Remove pg_attribute rows for the given relid.
1539 *
1540 * Note: this is shared by relation deletion and index deletion. It's
1541 * not intended for use anyplace else.
1542 */
1543 void
DeleteAttributeTuples(Oid relid)1544 DeleteAttributeTuples(Oid relid)
1545 {
1546 Relation attrel;
1547 SysScanDesc scan;
1548 ScanKeyData key[1];
1549 HeapTuple atttup;
1550
1551 /* Grab an appropriate lock on the pg_attribute relation */
1552 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1553
1554 /* Use the index to scan only attributes of the target relation */
1555 ScanKeyInit(&key[0],
1556 Anum_pg_attribute_attrelid,
1557 BTEqualStrategyNumber, F_OIDEQ,
1558 ObjectIdGetDatum(relid));
1559
1560 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1561 NULL, 1, key);
1562
1563 /* Delete all the matching tuples */
1564 while ((atttup = systable_getnext(scan)) != NULL)
1565 CatalogTupleDelete(attrel, &atttup->t_self);
1566
1567 /* Clean up after the scan */
1568 systable_endscan(scan);
1569 table_close(attrel, RowExclusiveLock);
1570 }
1571
1572 /*
1573 * DeleteSystemAttributeTuples
1574 *
1575 * Remove pg_attribute rows for system columns of the given relid.
1576 *
1577 * Note: this is only used when converting a table to a view. Views don't
1578 * have system columns, so we should remove them from pg_attribute.
1579 */
1580 void
DeleteSystemAttributeTuples(Oid relid)1581 DeleteSystemAttributeTuples(Oid relid)
1582 {
1583 Relation attrel;
1584 SysScanDesc scan;
1585 ScanKeyData key[2];
1586 HeapTuple atttup;
1587
1588 /* Grab an appropriate lock on the pg_attribute relation */
1589 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1590
1591 /* Use the index to scan only system attributes of the target relation */
1592 ScanKeyInit(&key[0],
1593 Anum_pg_attribute_attrelid,
1594 BTEqualStrategyNumber, F_OIDEQ,
1595 ObjectIdGetDatum(relid));
1596 ScanKeyInit(&key[1],
1597 Anum_pg_attribute_attnum,
1598 BTLessEqualStrategyNumber, F_INT2LE,
1599 Int16GetDatum(0));
1600
1601 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1602 NULL, 2, key);
1603
1604 /* Delete all the matching tuples */
1605 while ((atttup = systable_getnext(scan)) != NULL)
1606 CatalogTupleDelete(attrel, &atttup->t_self);
1607
1608 /* Clean up after the scan */
1609 systable_endscan(scan);
1610 table_close(attrel, RowExclusiveLock);
1611 }
1612
1613 /*
1614 * RemoveAttributeById
1615 *
1616 * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1617 * deleted in pg_attribute. We also remove pg_statistic entries for it.
1618 * (Everything else needed, such as getting rid of any pg_attrdef entry,
1619 * is handled by dependency.c.)
1620 */
1621 void
RemoveAttributeById(Oid relid,AttrNumber attnum)1622 RemoveAttributeById(Oid relid, AttrNumber attnum)
1623 {
1624 Relation rel;
1625 Relation attr_rel;
1626 HeapTuple tuple;
1627 Form_pg_attribute attStruct;
1628 char newattname[NAMEDATALEN];
1629
1630 /*
1631 * Grab an exclusive lock on the target table, which we will NOT release
1632 * until end of transaction. (In the simple case where we are directly
1633 * dropping this column, ATExecDropColumn already did this ... but when
1634 * cascading from a drop of some other object, we may not have any lock.)
1635 */
1636 rel = relation_open(relid, AccessExclusiveLock);
1637
1638 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1639
1640 tuple = SearchSysCacheCopy2(ATTNUM,
1641 ObjectIdGetDatum(relid),
1642 Int16GetDatum(attnum));
1643 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1644 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1645 attnum, relid);
1646 attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1647
1648 if (attnum < 0)
1649 {
1650 /* System attribute (probably OID) ... just delete the row */
1651
1652 CatalogTupleDelete(attr_rel, &tuple->t_self);
1653 }
1654 else
1655 {
1656 /* Dropping user attributes is lots harder */
1657
1658 /* Mark the attribute as dropped */
1659 attStruct->attisdropped = true;
1660
1661 /*
1662 * Set the type OID to invalid. A dropped attribute's type link
1663 * cannot be relied on (once the attribute is dropped, the type might
1664 * be too). Fortunately we do not need the type row --- the only
1665 * really essential information is the type's typlen and typalign,
1666 * which are preserved in the attribute's attlen and attalign. We set
1667 * atttypid to zero here as a means of catching code that incorrectly
1668 * expects it to be valid.
1669 */
1670 attStruct->atttypid = InvalidOid;
1671
1672 /* Remove any NOT NULL constraint the column may have */
1673 attStruct->attnotnull = false;
1674
1675 /* We don't want to keep stats for it anymore */
1676 attStruct->attstattarget = 0;
1677
1678 /* Unset this so no one tries to look up the generation expression */
1679 attStruct->attgenerated = '\0';
1680
1681 /*
1682 * Change the column name to something that isn't likely to conflict
1683 */
1684 snprintf(newattname, sizeof(newattname),
1685 "........pg.dropped.%d........", attnum);
1686 namestrcpy(&(attStruct->attname), newattname);
1687
1688 /* clear the missing value if any */
1689 if (attStruct->atthasmissing)
1690 {
1691 Datum valuesAtt[Natts_pg_attribute];
1692 bool nullsAtt[Natts_pg_attribute];
1693 bool replacesAtt[Natts_pg_attribute];
1694
1695 /* update the tuple - set atthasmissing and attmissingval */
1696 MemSet(valuesAtt, 0, sizeof(valuesAtt));
1697 MemSet(nullsAtt, false, sizeof(nullsAtt));
1698 MemSet(replacesAtt, false, sizeof(replacesAtt));
1699
1700 valuesAtt[Anum_pg_attribute_atthasmissing - 1] =
1701 BoolGetDatum(false);
1702 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
1703 valuesAtt[Anum_pg_attribute_attmissingval - 1] = (Datum) 0;
1704 nullsAtt[Anum_pg_attribute_attmissingval - 1] = true;
1705 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
1706
1707 tuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
1708 valuesAtt, nullsAtt, replacesAtt);
1709 }
1710
1711 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1712 }
1713
1714 /*
1715 * Because updating the pg_attribute row will trigger a relcache flush for
1716 * the target relation, we need not do anything else to notify other
1717 * backends of the change.
1718 */
1719
1720 table_close(attr_rel, RowExclusiveLock);
1721
1722 if (attnum > 0)
1723 RemoveStatistics(relid, attnum);
1724
1725 relation_close(rel, NoLock);
1726 }
1727
1728 /*
1729 * RemoveAttrDefault
1730 *
1731 * If the specified relation/attribute has a default, remove it.
1732 * (If no default, raise error if complain is true, else return quietly.)
1733 */
1734 void
RemoveAttrDefault(Oid relid,AttrNumber attnum,DropBehavior behavior,bool complain,bool internal)1735 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1736 DropBehavior behavior, bool complain, bool internal)
1737 {
1738 Relation attrdef_rel;
1739 ScanKeyData scankeys[2];
1740 SysScanDesc scan;
1741 HeapTuple tuple;
1742 bool found = false;
1743
1744 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1745
1746 ScanKeyInit(&scankeys[0],
1747 Anum_pg_attrdef_adrelid,
1748 BTEqualStrategyNumber, F_OIDEQ,
1749 ObjectIdGetDatum(relid));
1750 ScanKeyInit(&scankeys[1],
1751 Anum_pg_attrdef_adnum,
1752 BTEqualStrategyNumber, F_INT2EQ,
1753 Int16GetDatum(attnum));
1754
1755 scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1756 NULL, 2, scankeys);
1757
1758 /* There should be at most one matching tuple, but we loop anyway */
1759 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1760 {
1761 ObjectAddress object;
1762 Form_pg_attrdef attrtuple = (Form_pg_attrdef) GETSTRUCT(tuple);
1763
1764 object.classId = AttrDefaultRelationId;
1765 object.objectId = attrtuple->oid;
1766 object.objectSubId = 0;
1767
1768 performDeletion(&object, behavior,
1769 internal ? PERFORM_DELETION_INTERNAL : 0);
1770
1771 found = true;
1772 }
1773
1774 systable_endscan(scan);
1775 table_close(attrdef_rel, RowExclusiveLock);
1776
1777 if (complain && !found)
1778 elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1779 relid, attnum);
1780 }
1781
1782 /*
1783 * RemoveAttrDefaultById
1784 *
1785 * Remove a pg_attrdef entry specified by OID. This is the guts of
1786 * attribute-default removal. Note it should be called via performDeletion,
1787 * not directly.
1788 */
1789 void
RemoveAttrDefaultById(Oid attrdefId)1790 RemoveAttrDefaultById(Oid attrdefId)
1791 {
1792 Relation attrdef_rel;
1793 Relation attr_rel;
1794 Relation myrel;
1795 ScanKeyData scankeys[1];
1796 SysScanDesc scan;
1797 HeapTuple tuple;
1798 Oid myrelid;
1799 AttrNumber myattnum;
1800
1801 /* Grab an appropriate lock on the pg_attrdef relation */
1802 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1803
1804 /* Find the pg_attrdef tuple */
1805 ScanKeyInit(&scankeys[0],
1806 Anum_pg_attrdef_oid,
1807 BTEqualStrategyNumber, F_OIDEQ,
1808 ObjectIdGetDatum(attrdefId));
1809
1810 scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1811 NULL, 1, scankeys);
1812
1813 tuple = systable_getnext(scan);
1814 if (!HeapTupleIsValid(tuple))
1815 elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1816
1817 myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1818 myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1819
1820 /* Get an exclusive lock on the relation owning the attribute */
1821 myrel = relation_open(myrelid, AccessExclusiveLock);
1822
1823 /* Now we can delete the pg_attrdef row */
1824 CatalogTupleDelete(attrdef_rel, &tuple->t_self);
1825
1826 systable_endscan(scan);
1827 table_close(attrdef_rel, RowExclusiveLock);
1828
1829 /* Fix the pg_attribute row */
1830 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1831
1832 tuple = SearchSysCacheCopy2(ATTNUM,
1833 ObjectIdGetDatum(myrelid),
1834 Int16GetDatum(myattnum));
1835 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1836 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1837 myattnum, myrelid);
1838
1839 ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1840
1841 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1842
1843 /*
1844 * Our update of the pg_attribute row will force a relcache rebuild, so
1845 * there's nothing else to do here.
1846 */
1847 table_close(attr_rel, RowExclusiveLock);
1848
1849 /* Keep lock on attribute's rel until end of xact */
1850 relation_close(myrel, NoLock);
1851 }
1852
1853 /*
1854 * heap_drop_with_catalog - removes specified relation from catalogs
1855 *
1856 * Note that this routine is not responsible for dropping objects that are
1857 * linked to the pg_class entry via dependencies (for example, indexes and
1858 * constraints). Those are deleted by the dependency-tracing logic in
1859 * dependency.c before control gets here. In general, therefore, this routine
1860 * should never be called directly; go through performDeletion() instead.
1861 */
1862 void
heap_drop_with_catalog(Oid relid)1863 heap_drop_with_catalog(Oid relid)
1864 {
1865 Relation rel;
1866 HeapTuple tuple;
1867 Oid parentOid = InvalidOid,
1868 defaultPartOid = InvalidOid;
1869
1870 /*
1871 * To drop a partition safely, we must grab exclusive lock on its parent,
1872 * because another backend might be about to execute a query on the parent
1873 * table. If it relies on previously cached partition descriptor, then it
1874 * could attempt to access the just-dropped relation as its partition. We
1875 * must therefore take a table lock strong enough to prevent all queries
1876 * on the table from proceeding until we commit and send out a
1877 * shared-cache-inval notice that will make them update their partition
1878 * descriptors.
1879 */
1880 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1881 if (!HeapTupleIsValid(tuple))
1882 elog(ERROR, "cache lookup failed for relation %u", relid);
1883 if (((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1884 {
1885 parentOid = get_partition_parent(relid);
1886 LockRelationOid(parentOid, AccessExclusiveLock);
1887
1888 /*
1889 * If this is not the default partition, dropping it will change the
1890 * default partition's partition constraint, so we must lock it.
1891 */
1892 defaultPartOid = get_default_partition_oid(parentOid);
1893 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
1894 LockRelationOid(defaultPartOid, AccessExclusiveLock);
1895 }
1896
1897 ReleaseSysCache(tuple);
1898
1899 /*
1900 * Open and lock the relation.
1901 */
1902 rel = relation_open(relid, AccessExclusiveLock);
1903
1904 /*
1905 * There can no longer be anyone *else* touching the relation, but we
1906 * might still have open queries or cursors, or pending trigger events, in
1907 * our own session.
1908 */
1909 CheckTableNotInUse(rel, "DROP TABLE");
1910
1911 /*
1912 * This effectively deletes all rows in the table, and may be done in a
1913 * serializable transaction. In that case we must record a rw-conflict in
1914 * to this transaction from each transaction holding a predicate lock on
1915 * the table.
1916 */
1917 CheckTableForSerializableConflictIn(rel);
1918
1919 /*
1920 * Delete pg_foreign_table tuple first.
1921 */
1922 if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1923 {
1924 Relation rel;
1925 HeapTuple tuple;
1926
1927 rel = table_open(ForeignTableRelationId, RowExclusiveLock);
1928
1929 tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1930 if (!HeapTupleIsValid(tuple))
1931 elog(ERROR, "cache lookup failed for foreign table %u", relid);
1932
1933 CatalogTupleDelete(rel, &tuple->t_self);
1934
1935 ReleaseSysCache(tuple);
1936 table_close(rel, RowExclusiveLock);
1937 }
1938
1939 /*
1940 * If a partitioned table, delete the pg_partitioned_table tuple.
1941 */
1942 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1943 RemovePartitionKeyByRelId(relid);
1944
1945 /*
1946 * If the relation being dropped is the default partition itself,
1947 * invalidate its entry in pg_partitioned_table.
1948 */
1949 if (relid == defaultPartOid)
1950 update_default_partition_oid(parentOid, InvalidOid);
1951
1952 /*
1953 * Schedule unlinking of the relation's physical files at commit.
1954 */
1955 if (rel->rd_rel->relkind != RELKIND_VIEW &&
1956 rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
1957 rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
1958 rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1959 {
1960 RelationDropStorage(rel);
1961 }
1962
1963 /*
1964 * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1965 * until transaction commit. This ensures no one else will try to do
1966 * something with the doomed relation.
1967 */
1968 relation_close(rel, NoLock);
1969
1970 /*
1971 * Remove any associated relation synchronization states.
1972 */
1973 RemoveSubscriptionRel(InvalidOid, relid);
1974
1975 /*
1976 * Forget any ON COMMIT action for the rel
1977 */
1978 remove_on_commit_action(relid);
1979
1980 /*
1981 * Flush the relation from the relcache. We want to do this before
1982 * starting to remove catalog entries, just to be certain that no relcache
1983 * entry rebuild will happen partway through. (That should not really
1984 * matter, since we don't do CommandCounterIncrement here, but let's be
1985 * safe.)
1986 */
1987 RelationForgetRelation(relid);
1988
1989 /*
1990 * remove inheritance information
1991 */
1992 RelationRemoveInheritance(relid);
1993
1994 /*
1995 * delete statistics
1996 */
1997 RemoveStatistics(relid, 0);
1998
1999 /*
2000 * delete attribute tuples
2001 */
2002 DeleteAttributeTuples(relid);
2003
2004 /*
2005 * delete relation tuple
2006 */
2007 DeleteRelationTuple(relid);
2008
2009 if (OidIsValid(parentOid))
2010 {
2011 /*
2012 * If this is not the default partition, the partition constraint of
2013 * the default partition has changed to include the portion of the key
2014 * space previously covered by the dropped partition.
2015 */
2016 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
2017 CacheInvalidateRelcacheByRelid(defaultPartOid);
2018
2019 /*
2020 * Invalidate the parent's relcache so that the partition is no longer
2021 * included in its partition descriptor.
2022 */
2023 CacheInvalidateRelcacheByRelid(parentOid);
2024 /* keep the lock */
2025 }
2026 }
2027
2028
2029 /*
2030 * RelationClearMissing
2031 *
2032 * Set atthasmissing and attmissingval to false/null for all attributes
2033 * where they are currently set. This can be safely and usefully done if
2034 * the table is rewritten (e.g. by VACUUM FULL or CLUSTER) where we know there
2035 * are no rows left with less than a full complement of attributes.
2036 *
2037 * The caller must have an AccessExclusive lock on the relation.
2038 */
2039 void
RelationClearMissing(Relation rel)2040 RelationClearMissing(Relation rel)
2041 {
2042 Relation attr_rel;
2043 Oid relid = RelationGetRelid(rel);
2044 int natts = RelationGetNumberOfAttributes(rel);
2045 int attnum;
2046 Datum repl_val[Natts_pg_attribute];
2047 bool repl_null[Natts_pg_attribute];
2048 bool repl_repl[Natts_pg_attribute];
2049 Form_pg_attribute attrtuple;
2050 HeapTuple tuple,
2051 newtuple;
2052
2053 memset(repl_val, 0, sizeof(repl_val));
2054 memset(repl_null, false, sizeof(repl_null));
2055 memset(repl_repl, false, sizeof(repl_repl));
2056
2057 repl_val[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(false);
2058 repl_null[Anum_pg_attribute_attmissingval - 1] = true;
2059
2060 repl_repl[Anum_pg_attribute_atthasmissing - 1] = true;
2061 repl_repl[Anum_pg_attribute_attmissingval - 1] = true;
2062
2063
2064 /* Get a lock on pg_attribute */
2065 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
2066
2067 /* process each non-system attribute, including any dropped columns */
2068 for (attnum = 1; attnum <= natts; attnum++)
2069 {
2070 tuple = SearchSysCache2(ATTNUM,
2071 ObjectIdGetDatum(relid),
2072 Int16GetDatum(attnum));
2073 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
2074 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2075 attnum, relid);
2076
2077 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
2078
2079 /* ignore any where atthasmissing is not true */
2080 if (attrtuple->atthasmissing)
2081 {
2082 newtuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
2083 repl_val, repl_null, repl_repl);
2084
2085 CatalogTupleUpdate(attr_rel, &newtuple->t_self, newtuple);
2086
2087 heap_freetuple(newtuple);
2088 }
2089
2090 ReleaseSysCache(tuple);
2091 }
2092
2093 /*
2094 * Our update of the pg_attribute rows will force a relcache rebuild, so
2095 * there's nothing else to do here.
2096 */
2097 table_close(attr_rel, RowExclusiveLock);
2098 }
2099
2100 /*
2101 * SetAttrMissing
2102 *
2103 * Set the missing value of a single attribute. This should only be used by
2104 * binary upgrade. Takes an AccessExclusive lock on the relation owning the
2105 * attribute.
2106 */
2107 void
SetAttrMissing(Oid relid,char * attname,char * value)2108 SetAttrMissing(Oid relid, char *attname, char *value)
2109 {
2110 Datum valuesAtt[Natts_pg_attribute];
2111 bool nullsAtt[Natts_pg_attribute];
2112 bool replacesAtt[Natts_pg_attribute];
2113 Datum missingval;
2114 Form_pg_attribute attStruct;
2115 Relation attrrel,
2116 tablerel;
2117 HeapTuple atttup,
2118 newtup;
2119
2120 /* lock the table the attribute belongs to */
2121 tablerel = table_open(relid, AccessExclusiveLock);
2122
2123 /* Don't do anything unless it's a plain table */
2124 if (tablerel->rd_rel->relkind != RELKIND_RELATION)
2125 {
2126 table_close(tablerel, AccessExclusiveLock);
2127 return;
2128 }
2129
2130 /* Lock the attribute row and get the data */
2131 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2132 atttup = SearchSysCacheAttName(relid, attname);
2133 if (!HeapTupleIsValid(atttup))
2134 elog(ERROR, "cache lookup failed for attribute %s of relation %u",
2135 attname, relid);
2136 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2137
2138 /* get an array value from the value string */
2139 missingval = OidFunctionCall3(F_ARRAY_IN,
2140 CStringGetDatum(value),
2141 ObjectIdGetDatum(attStruct->atttypid),
2142 Int32GetDatum(attStruct->atttypmod));
2143
2144 /* update the tuple - set atthasmissing and attmissingval */
2145 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2146 MemSet(nullsAtt, false, sizeof(nullsAtt));
2147 MemSet(replacesAtt, false, sizeof(replacesAtt));
2148
2149 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
2150 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2151 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2152 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2153
2154 newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2155 valuesAtt, nullsAtt, replacesAtt);
2156 CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
2157
2158 /* clean up */
2159 ReleaseSysCache(atttup);
2160 table_close(attrrel, RowExclusiveLock);
2161 table_close(tablerel, AccessExclusiveLock);
2162 }
2163
2164 /*
2165 * Store a default expression for column attnum of relation rel.
2166 *
2167 * Returns the OID of the new pg_attrdef tuple.
2168 *
2169 * add_column_mode must be true if we are storing the default for a new
2170 * attribute, and false if it's for an already existing attribute. The reason
2171 * for this is that the missing value must never be updated after it is set,
2172 * which can only be when a column is added to the table. Otherwise we would
2173 * in effect be changing existing tuples.
2174 */
2175 Oid
StoreAttrDefault(Relation rel,AttrNumber attnum,Node * expr,bool is_internal,bool add_column_mode)2176 StoreAttrDefault(Relation rel, AttrNumber attnum,
2177 Node *expr, bool is_internal, bool add_column_mode)
2178 {
2179 char *adbin;
2180 Relation adrel;
2181 HeapTuple tuple;
2182 Datum values[4];
2183 static bool nulls[4] = {false, false, false, false};
2184 Relation attrrel;
2185 HeapTuple atttup;
2186 Form_pg_attribute attStruct;
2187 char attgenerated;
2188 Oid attrdefOid;
2189 ObjectAddress colobject,
2190 defobject;
2191
2192 adrel = table_open(AttrDefaultRelationId, RowExclusiveLock);
2193
2194 /*
2195 * Flatten expression to string form for storage.
2196 */
2197 adbin = nodeToString(expr);
2198
2199 /*
2200 * Make the pg_attrdef entry.
2201 */
2202 attrdefOid = GetNewOidWithIndex(adrel, AttrDefaultOidIndexId,
2203 Anum_pg_attrdef_oid);
2204 values[Anum_pg_attrdef_oid - 1] = ObjectIdGetDatum(attrdefOid);
2205 values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
2206 values[Anum_pg_attrdef_adnum - 1] = attnum;
2207 values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
2208
2209 tuple = heap_form_tuple(adrel->rd_att, values, nulls);
2210 CatalogTupleInsert(adrel, tuple);
2211
2212 defobject.classId = AttrDefaultRelationId;
2213 defobject.objectId = attrdefOid;
2214 defobject.objectSubId = 0;
2215
2216 table_close(adrel, RowExclusiveLock);
2217
2218 /* now can free some of the stuff allocated above */
2219 pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
2220 heap_freetuple(tuple);
2221 pfree(adbin);
2222
2223 /*
2224 * Update the pg_attribute entry for the column to show that a default
2225 * exists.
2226 */
2227 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2228 atttup = SearchSysCacheCopy2(ATTNUM,
2229 ObjectIdGetDatum(RelationGetRelid(rel)),
2230 Int16GetDatum(attnum));
2231 if (!HeapTupleIsValid(atttup))
2232 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2233 attnum, RelationGetRelid(rel));
2234 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2235 attgenerated = attStruct->attgenerated;
2236 if (!attStruct->atthasdef)
2237 {
2238 Form_pg_attribute defAttStruct;
2239
2240 ExprState *exprState;
2241 Expr *expr2 = (Expr *) expr;
2242 EState *estate = NULL;
2243 ExprContext *econtext;
2244 Datum valuesAtt[Natts_pg_attribute];
2245 bool nullsAtt[Natts_pg_attribute];
2246 bool replacesAtt[Natts_pg_attribute];
2247 Datum missingval = (Datum) 0;
2248 bool missingIsNull = true;
2249
2250 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2251 MemSet(nullsAtt, false, sizeof(nullsAtt));
2252 MemSet(replacesAtt, false, sizeof(replacesAtt));
2253 valuesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2254 replacesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2255
2256 if (rel->rd_rel->relkind == RELKIND_RELATION && add_column_mode &&
2257 !attgenerated)
2258 {
2259 expr2 = expression_planner(expr2);
2260 estate = CreateExecutorState();
2261 exprState = ExecPrepareExpr(expr2, estate);
2262 econtext = GetPerTupleExprContext(estate);
2263
2264 missingval = ExecEvalExpr(exprState, econtext,
2265 &missingIsNull);
2266
2267 FreeExecutorState(estate);
2268
2269 defAttStruct = TupleDescAttr(rel->rd_att, attnum - 1);
2270
2271 if (missingIsNull)
2272 {
2273 /* if the default evaluates to NULL, just store a NULL array */
2274 missingval = (Datum) 0;
2275 }
2276 else
2277 {
2278 /* otherwise make a one-element array of the value */
2279 missingval = PointerGetDatum(
2280 construct_array(&missingval,
2281 1,
2282 defAttStruct->atttypid,
2283 defAttStruct->attlen,
2284 defAttStruct->attbyval,
2285 defAttStruct->attalign));
2286 }
2287
2288 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = !missingIsNull;
2289 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2290 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2291 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2292 nullsAtt[Anum_pg_attribute_attmissingval - 1] = missingIsNull;
2293 }
2294 atttup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2295 valuesAtt, nullsAtt, replacesAtt);
2296
2297 CatalogTupleUpdate(attrrel, &atttup->t_self, atttup);
2298
2299 if (!missingIsNull)
2300 pfree(DatumGetPointer(missingval));
2301
2302 }
2303 table_close(attrrel, RowExclusiveLock);
2304 heap_freetuple(atttup);
2305
2306 /*
2307 * Make a dependency so that the pg_attrdef entry goes away if the column
2308 * (or whole table) is deleted.
2309 */
2310 colobject.classId = RelationRelationId;
2311 colobject.objectId = RelationGetRelid(rel);
2312 colobject.objectSubId = attnum;
2313
2314 recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
2315
2316 /*
2317 * Record dependencies on objects used in the expression, too.
2318 */
2319 if (attgenerated)
2320 {
2321 /*
2322 * Generated column: Dropping anything that the generation expression
2323 * refers to automatically drops the generated column.
2324 */
2325 recordDependencyOnSingleRelExpr(&colobject, expr, RelationGetRelid(rel),
2326 DEPENDENCY_AUTO,
2327 DEPENDENCY_AUTO, false);
2328 }
2329 else
2330 {
2331 /*
2332 * Normal default: Dropping anything that the default refers to
2333 * requires CASCADE and drops the default only.
2334 */
2335 recordDependencyOnSingleRelExpr(&defobject, expr, RelationGetRelid(rel),
2336 DEPENDENCY_NORMAL,
2337 DEPENDENCY_NORMAL, false);
2338 }
2339
2340 /*
2341 * Post creation hook for attribute defaults.
2342 *
2343 * XXX. ALTER TABLE ALTER COLUMN SET/DROP DEFAULT is implemented with a
2344 * couple of deletion/creation of the attribute's default entry, so the
2345 * callee should check existence of an older version of this entry if it
2346 * needs to distinguish.
2347 */
2348 InvokeObjectPostCreateHookArg(AttrDefaultRelationId,
2349 RelationGetRelid(rel), attnum, is_internal);
2350
2351 return attrdefOid;
2352 }
2353
2354 /*
2355 * Store a check-constraint expression for the given relation.
2356 *
2357 * Caller is responsible for updating the count of constraints
2358 * in the pg_class entry for the relation.
2359 *
2360 * The OID of the new constraint is returned.
2361 */
2362 static Oid
StoreRelCheck(Relation rel,const char * ccname,Node * expr,bool is_validated,bool is_local,int inhcount,bool is_no_inherit,bool is_internal)2363 StoreRelCheck(Relation rel, const char *ccname, Node *expr,
2364 bool is_validated, bool is_local, int inhcount,
2365 bool is_no_inherit, bool is_internal)
2366 {
2367 char *ccbin;
2368 List *varList;
2369 int keycount;
2370 int16 *attNos;
2371 Oid constrOid;
2372
2373 /*
2374 * Flatten expression to string form for storage.
2375 */
2376 ccbin = nodeToString(expr);
2377
2378 /*
2379 * Find columns of rel that are used in expr
2380 *
2381 * NB: pull_var_clause is okay here only because we don't allow subselects
2382 * in check constraints; it would fail to examine the contents of
2383 * subselects.
2384 */
2385 varList = pull_var_clause(expr, 0);
2386 keycount = list_length(varList);
2387
2388 if (keycount > 0)
2389 {
2390 ListCell *vl;
2391 int i = 0;
2392
2393 attNos = (int16 *) palloc(keycount * sizeof(int16));
2394 foreach(vl, varList)
2395 {
2396 Var *var = (Var *) lfirst(vl);
2397 int j;
2398
2399 for (j = 0; j < i; j++)
2400 if (attNos[j] == var->varattno)
2401 break;
2402 if (j == i)
2403 attNos[i++] = var->varattno;
2404 }
2405 keycount = i;
2406 }
2407 else
2408 attNos = NULL;
2409
2410 /*
2411 * Partitioned tables do not contain any rows themselves, so a NO INHERIT
2412 * constraint makes no sense.
2413 */
2414 if (is_no_inherit &&
2415 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2416 ereport(ERROR,
2417 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2418 errmsg("cannot add NO INHERIT constraint to partitioned table \"%s\"",
2419 RelationGetRelationName(rel))));
2420
2421 /*
2422 * Create the Check Constraint
2423 */
2424 constrOid =
2425 CreateConstraintEntry(ccname, /* Constraint Name */
2426 RelationGetNamespace(rel), /* namespace */
2427 CONSTRAINT_CHECK, /* Constraint Type */
2428 false, /* Is Deferrable */
2429 false, /* Is Deferred */
2430 is_validated,
2431 InvalidOid, /* no parent constraint */
2432 RelationGetRelid(rel), /* relation */
2433 attNos, /* attrs in the constraint */
2434 keycount, /* # key attrs in the constraint */
2435 keycount, /* # total attrs in the constraint */
2436 InvalidOid, /* not a domain constraint */
2437 InvalidOid, /* no associated index */
2438 InvalidOid, /* Foreign key fields */
2439 NULL,
2440 NULL,
2441 NULL,
2442 NULL,
2443 0,
2444 ' ',
2445 ' ',
2446 ' ',
2447 NULL, /* not an exclusion constraint */
2448 expr, /* Tree form of check constraint */
2449 ccbin, /* Binary form of check constraint */
2450 is_local, /* conislocal */
2451 inhcount, /* coninhcount */
2452 is_no_inherit, /* connoinherit */
2453 is_internal); /* internally constructed? */
2454
2455 pfree(ccbin);
2456
2457 return constrOid;
2458 }
2459
2460 /*
2461 * Store defaults and constraints (passed as a list of CookedConstraint).
2462 *
2463 * Each CookedConstraint struct is modified to store the new catalog tuple OID.
2464 *
2465 * NOTE: only pre-cooked expressions will be passed this way, which is to
2466 * say expressions inherited from an existing relation. Newly parsed
2467 * expressions can be added later, by direct calls to StoreAttrDefault
2468 * and StoreRelCheck (see AddRelationNewConstraints()).
2469 */
2470 static void
StoreConstraints(Relation rel,List * cooked_constraints,bool is_internal)2471 StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
2472 {
2473 int numchecks = 0;
2474 ListCell *lc;
2475
2476 if (cooked_constraints == NIL)
2477 return; /* nothing to do */
2478
2479 /*
2480 * Deparsing of constraint expressions will fail unless the just-created
2481 * pg_attribute tuples for this relation are made visible. So, bump the
2482 * command counter. CAUTION: this will cause a relcache entry rebuild.
2483 */
2484 CommandCounterIncrement();
2485
2486 foreach(lc, cooked_constraints)
2487 {
2488 CookedConstraint *con = (CookedConstraint *) lfirst(lc);
2489
2490 switch (con->contype)
2491 {
2492 case CONSTR_DEFAULT:
2493 con->conoid = StoreAttrDefault(rel, con->attnum, con->expr,
2494 is_internal, false);
2495 break;
2496 case CONSTR_CHECK:
2497 con->conoid =
2498 StoreRelCheck(rel, con->name, con->expr,
2499 !con->skip_validation, con->is_local,
2500 con->inhcount, con->is_no_inherit,
2501 is_internal);
2502 numchecks++;
2503 break;
2504 default:
2505 elog(ERROR, "unrecognized constraint type: %d",
2506 (int) con->contype);
2507 }
2508 }
2509
2510 if (numchecks > 0)
2511 SetRelationNumChecks(rel, numchecks);
2512 }
2513
2514 /*
2515 * AddRelationNewConstraints
2516 *
2517 * Add new column default expressions and/or constraint check expressions
2518 * to an existing relation. This is defined to do both for efficiency in
2519 * DefineRelation, but of course you can do just one or the other by passing
2520 * empty lists.
2521 *
2522 * rel: relation to be modified
2523 * newColDefaults: list of RawColumnDefault structures
2524 * newConstraints: list of Constraint nodes
2525 * allow_merge: true if check constraints may be merged with existing ones
2526 * is_local: true if definition is local, false if it's inherited
2527 * is_internal: true if result of some internal process, not a user request
2528 *
2529 * All entries in newColDefaults will be processed. Entries in newConstraints
2530 * will be processed only if they are CONSTR_CHECK type.
2531 *
2532 * Returns a list of CookedConstraint nodes that shows the cooked form of
2533 * the default and constraint expressions added to the relation.
2534 *
2535 * NB: caller should have opened rel with AccessExclusiveLock, and should
2536 * hold that lock till end of transaction. Also, we assume the caller has
2537 * done a CommandCounterIncrement if necessary to make the relation's catalog
2538 * tuples visible.
2539 */
2540 List *
AddRelationNewConstraints(Relation rel,List * newColDefaults,List * newConstraints,bool allow_merge,bool is_local,bool is_internal,const char * queryString)2541 AddRelationNewConstraints(Relation rel,
2542 List *newColDefaults,
2543 List *newConstraints,
2544 bool allow_merge,
2545 bool is_local,
2546 bool is_internal,
2547 const char *queryString)
2548 {
2549 List *cookedConstraints = NIL;
2550 TupleDesc tupleDesc;
2551 TupleConstr *oldconstr;
2552 int numoldchecks;
2553 ParseState *pstate;
2554 RangeTblEntry *rte;
2555 int numchecks;
2556 List *checknames;
2557 ListCell *cell;
2558 Node *expr;
2559 CookedConstraint *cooked;
2560
2561 /*
2562 * Get info about existing constraints.
2563 */
2564 tupleDesc = RelationGetDescr(rel);
2565 oldconstr = tupleDesc->constr;
2566 if (oldconstr)
2567 numoldchecks = oldconstr->num_check;
2568 else
2569 numoldchecks = 0;
2570
2571 /*
2572 * Create a dummy ParseState and insert the target relation as its sole
2573 * rangetable entry. We need a ParseState for transformExpr.
2574 */
2575 pstate = make_parsestate(NULL);
2576 pstate->p_sourcetext = queryString;
2577 rte = addRangeTableEntryForRelation(pstate,
2578 rel,
2579 AccessShareLock,
2580 NULL,
2581 false,
2582 true);
2583 addRTEtoQuery(pstate, rte, true, true, true);
2584
2585 /*
2586 * Process column default expressions.
2587 */
2588 foreach(cell, newColDefaults)
2589 {
2590 RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2591 Form_pg_attribute atp = TupleDescAttr(rel->rd_att, colDef->attnum - 1);
2592 Oid defOid;
2593
2594 expr = cookDefault(pstate, colDef->raw_default,
2595 atp->atttypid, atp->atttypmod,
2596 NameStr(atp->attname),
2597 atp->attgenerated);
2598
2599 /*
2600 * If the expression is just a NULL constant, we do not bother to make
2601 * an explicit pg_attrdef entry, since the default behavior is
2602 * equivalent. This applies to column defaults, but not for
2603 * generation expressions.
2604 *
2605 * Note a nonobvious property of this test: if the column is of a
2606 * domain type, what we'll get is not a bare null Const but a
2607 * CoerceToDomain expr, so we will not discard the default. This is
2608 * critical because the column default needs to be retained to
2609 * override any default that the domain might have.
2610 */
2611 if (expr == NULL ||
2612 (!colDef->generated &&
2613 IsA(expr, Const) &&
2614 castNode(Const, expr)->constisnull))
2615 continue;
2616
2617 /* If the DEFAULT is volatile we cannot use a missing value */
2618 if (colDef->missingMode && contain_volatile_functions((Node *) expr))
2619 colDef->missingMode = false;
2620
2621 defOid = StoreAttrDefault(rel, colDef->attnum, expr, is_internal,
2622 colDef->missingMode);
2623
2624 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2625 cooked->contype = CONSTR_DEFAULT;
2626 cooked->conoid = defOid;
2627 cooked->name = NULL;
2628 cooked->attnum = colDef->attnum;
2629 cooked->expr = expr;
2630 cooked->skip_validation = false;
2631 cooked->is_local = is_local;
2632 cooked->inhcount = is_local ? 0 : 1;
2633 cooked->is_no_inherit = false;
2634 cookedConstraints = lappend(cookedConstraints, cooked);
2635 }
2636
2637 /*
2638 * Process constraint expressions.
2639 */
2640 numchecks = numoldchecks;
2641 checknames = NIL;
2642 foreach(cell, newConstraints)
2643 {
2644 Constraint *cdef = (Constraint *) lfirst(cell);
2645 char *ccname;
2646 Oid constrOid;
2647
2648 if (cdef->contype != CONSTR_CHECK)
2649 continue;
2650
2651 if (cdef->raw_expr != NULL)
2652 {
2653 Assert(cdef->cooked_expr == NULL);
2654
2655 /*
2656 * Transform raw parsetree to executable expression, and verify
2657 * it's valid as a CHECK constraint.
2658 */
2659 expr = cookConstraint(pstate, cdef->raw_expr,
2660 RelationGetRelationName(rel));
2661 }
2662 else
2663 {
2664 Assert(cdef->cooked_expr != NULL);
2665
2666 /*
2667 * Here, we assume the parser will only pass us valid CHECK
2668 * expressions, so we do no particular checking.
2669 */
2670 expr = stringToNode(cdef->cooked_expr);
2671 }
2672
2673 /*
2674 * Check name uniqueness, or generate a name if none was given.
2675 */
2676 if (cdef->conname != NULL)
2677 {
2678 ListCell *cell2;
2679
2680 ccname = cdef->conname;
2681 /* Check against other new constraints */
2682 /* Needed because we don't do CommandCounterIncrement in loop */
2683 foreach(cell2, checknames)
2684 {
2685 if (strcmp((char *) lfirst(cell2), ccname) == 0)
2686 ereport(ERROR,
2687 (errcode(ERRCODE_DUPLICATE_OBJECT),
2688 errmsg("check constraint \"%s\" already exists",
2689 ccname)));
2690 }
2691
2692 /* save name for future checks */
2693 checknames = lappend(checknames, ccname);
2694
2695 /*
2696 * Check against pre-existing constraints. If we are allowed to
2697 * merge with an existing constraint, there's no more to do here.
2698 * (We omit the duplicate constraint from the result, which is
2699 * what ATAddCheckConstraint wants.)
2700 */
2701 if (MergeWithExistingConstraint(rel, ccname, expr,
2702 allow_merge, is_local,
2703 cdef->initially_valid,
2704 cdef->is_no_inherit))
2705 continue;
2706 }
2707 else
2708 {
2709 /*
2710 * When generating a name, we want to create "tab_col_check" for a
2711 * column constraint and "tab_check" for a table constraint. We
2712 * no longer have any info about the syntactic positioning of the
2713 * constraint phrase, so we approximate this by seeing whether the
2714 * expression references more than one column. (If the user
2715 * played by the rules, the result is the same...)
2716 *
2717 * Note: pull_var_clause() doesn't descend into sublinks, but we
2718 * eliminated those above; and anyway this only needs to be an
2719 * approximate answer.
2720 */
2721 List *vars;
2722 char *colname;
2723
2724 vars = pull_var_clause(expr, 0);
2725
2726 /* eliminate duplicates */
2727 vars = list_union(NIL, vars);
2728
2729 if (list_length(vars) == 1)
2730 colname = get_attname(RelationGetRelid(rel),
2731 ((Var *) linitial(vars))->varattno,
2732 true);
2733 else
2734 colname = NULL;
2735
2736 ccname = ChooseConstraintName(RelationGetRelationName(rel),
2737 colname,
2738 "check",
2739 RelationGetNamespace(rel),
2740 checknames);
2741
2742 /* save name for future checks */
2743 checknames = lappend(checknames, ccname);
2744 }
2745
2746 /*
2747 * OK, store it.
2748 */
2749 constrOid =
2750 StoreRelCheck(rel, ccname, expr, cdef->initially_valid, is_local,
2751 is_local ? 0 : 1, cdef->is_no_inherit, is_internal);
2752
2753 numchecks++;
2754
2755 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2756 cooked->contype = CONSTR_CHECK;
2757 cooked->conoid = constrOid;
2758 cooked->name = ccname;
2759 cooked->attnum = 0;
2760 cooked->expr = expr;
2761 cooked->skip_validation = cdef->skip_validation;
2762 cooked->is_local = is_local;
2763 cooked->inhcount = is_local ? 0 : 1;
2764 cooked->is_no_inherit = cdef->is_no_inherit;
2765 cookedConstraints = lappend(cookedConstraints, cooked);
2766 }
2767
2768 /*
2769 * Update the count of constraints in the relation's pg_class tuple. We do
2770 * this even if there was no change, in order to ensure that an SI update
2771 * message is sent out for the pg_class tuple, which will force other
2772 * backends to rebuild their relcache entries for the rel. (This is
2773 * critical if we added defaults but not constraints.)
2774 */
2775 SetRelationNumChecks(rel, numchecks);
2776
2777 return cookedConstraints;
2778 }
2779
2780 /*
2781 * Check for a pre-existing check constraint that conflicts with a proposed
2782 * new one, and either adjust its conislocal/coninhcount settings or throw
2783 * error as needed.
2784 *
2785 * Returns true if merged (constraint is a duplicate), or false if it's
2786 * got a so-far-unique name, or throws error if conflict.
2787 *
2788 * XXX See MergeConstraintsIntoExisting too if you change this code.
2789 */
2790 static bool
MergeWithExistingConstraint(Relation rel,const char * ccname,Node * expr,bool allow_merge,bool is_local,bool is_initially_valid,bool is_no_inherit)2791 MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
2792 bool allow_merge, bool is_local,
2793 bool is_initially_valid,
2794 bool is_no_inherit)
2795 {
2796 bool found;
2797 Relation conDesc;
2798 SysScanDesc conscan;
2799 ScanKeyData skey[3];
2800 HeapTuple tup;
2801
2802 /* Search for a pg_constraint entry with same name and relation */
2803 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
2804
2805 found = false;
2806
2807 ScanKeyInit(&skey[0],
2808 Anum_pg_constraint_conrelid,
2809 BTEqualStrategyNumber, F_OIDEQ,
2810 ObjectIdGetDatum(RelationGetRelid(rel)));
2811 ScanKeyInit(&skey[1],
2812 Anum_pg_constraint_contypid,
2813 BTEqualStrategyNumber, F_OIDEQ,
2814 ObjectIdGetDatum(InvalidOid));
2815 ScanKeyInit(&skey[2],
2816 Anum_pg_constraint_conname,
2817 BTEqualStrategyNumber, F_NAMEEQ,
2818 CStringGetDatum(ccname));
2819
2820 conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId, true,
2821 NULL, 3, skey);
2822
2823 /* There can be at most one matching row */
2824 if (HeapTupleIsValid(tup = systable_getnext(conscan)))
2825 {
2826 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2827
2828 /* Found it. Conflicts if not identical check constraint */
2829 if (con->contype == CONSTRAINT_CHECK)
2830 {
2831 Datum val;
2832 bool isnull;
2833
2834 val = fastgetattr(tup,
2835 Anum_pg_constraint_conbin,
2836 conDesc->rd_att, &isnull);
2837 if (isnull)
2838 elog(ERROR, "null conbin for rel %s",
2839 RelationGetRelationName(rel));
2840 if (equal(expr, stringToNode(TextDatumGetCString(val))))
2841 found = true;
2842 }
2843
2844 /*
2845 * If the existing constraint is purely inherited (no local
2846 * definition) then interpret addition of a local constraint as a
2847 * legal merge. This allows ALTER ADD CONSTRAINT on parent and child
2848 * tables to be given in either order with same end state. However if
2849 * the relation is a partition, all inherited constraints are always
2850 * non-local, including those that were merged.
2851 */
2852 if (is_local && !con->conislocal && !rel->rd_rel->relispartition)
2853 allow_merge = true;
2854
2855 if (!found || !allow_merge)
2856 ereport(ERROR,
2857 (errcode(ERRCODE_DUPLICATE_OBJECT),
2858 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2859 ccname, RelationGetRelationName(rel))));
2860
2861 /* If the child constraint is "no inherit" then cannot merge */
2862 if (con->connoinherit)
2863 ereport(ERROR,
2864 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2865 errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
2866 ccname, RelationGetRelationName(rel))));
2867
2868 /*
2869 * Must not change an existing inherited constraint to "no inherit"
2870 * status. That's because inherited constraints should be able to
2871 * propagate to lower-level children.
2872 */
2873 if (con->coninhcount > 0 && is_no_inherit)
2874 ereport(ERROR,
2875 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2876 errmsg("constraint \"%s\" conflicts with inherited constraint on relation \"%s\"",
2877 ccname, RelationGetRelationName(rel))));
2878
2879 /*
2880 * If the child constraint is "not valid" then cannot merge with a
2881 * valid parent constraint.
2882 */
2883 if (is_initially_valid && !con->convalidated)
2884 ereport(ERROR,
2885 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2886 errmsg("constraint \"%s\" conflicts with NOT VALID constraint on relation \"%s\"",
2887 ccname, RelationGetRelationName(rel))));
2888
2889 /* OK to update the tuple */
2890 ereport(NOTICE,
2891 (errmsg("merging constraint \"%s\" with inherited definition",
2892 ccname)));
2893
2894 tup = heap_copytuple(tup);
2895 con = (Form_pg_constraint) GETSTRUCT(tup);
2896
2897 /*
2898 * In case of partitions, an inherited constraint must be inherited
2899 * only once since it cannot have multiple parents and it is never
2900 * considered local.
2901 */
2902 if (rel->rd_rel->relispartition)
2903 {
2904 con->coninhcount = 1;
2905 con->conislocal = false;
2906 }
2907 else
2908 {
2909 if (is_local)
2910 con->conislocal = true;
2911 else
2912 con->coninhcount++;
2913 }
2914
2915 if (is_no_inherit)
2916 {
2917 Assert(is_local);
2918 con->connoinherit = true;
2919 }
2920
2921 CatalogTupleUpdate(conDesc, &tup->t_self, tup);
2922 }
2923
2924 systable_endscan(conscan);
2925 table_close(conDesc, RowExclusiveLock);
2926
2927 return found;
2928 }
2929
2930 /*
2931 * Update the count of constraints in the relation's pg_class tuple.
2932 *
2933 * Caller had better hold exclusive lock on the relation.
2934 *
2935 * An important side effect is that a SI update message will be sent out for
2936 * the pg_class tuple, which will force other backends to rebuild their
2937 * relcache entries for the rel. Also, this backend will rebuild its
2938 * own relcache entry at the next CommandCounterIncrement.
2939 */
2940 static void
SetRelationNumChecks(Relation rel,int numchecks)2941 SetRelationNumChecks(Relation rel, int numchecks)
2942 {
2943 Relation relrel;
2944 HeapTuple reltup;
2945 Form_pg_class relStruct;
2946
2947 relrel = table_open(RelationRelationId, RowExclusiveLock);
2948 reltup = SearchSysCacheCopy1(RELOID,
2949 ObjectIdGetDatum(RelationGetRelid(rel)));
2950 if (!HeapTupleIsValid(reltup))
2951 elog(ERROR, "cache lookup failed for relation %u",
2952 RelationGetRelid(rel));
2953 relStruct = (Form_pg_class) GETSTRUCT(reltup);
2954
2955 if (relStruct->relchecks != numchecks)
2956 {
2957 relStruct->relchecks = numchecks;
2958
2959 CatalogTupleUpdate(relrel, &reltup->t_self, reltup);
2960 }
2961 else
2962 {
2963 /* Skip the disk update, but force relcache inval anyway */
2964 CacheInvalidateRelcache(rel);
2965 }
2966
2967 heap_freetuple(reltup);
2968 table_close(relrel, RowExclusiveLock);
2969 }
2970
2971 /*
2972 * Check for references to generated columns
2973 */
2974 static bool
check_nested_generated_walker(Node * node,void * context)2975 check_nested_generated_walker(Node *node, void *context)
2976 {
2977 ParseState *pstate = context;
2978
2979 if (node == NULL)
2980 return false;
2981 else if (IsA(node, Var))
2982 {
2983 Var *var = (Var *) node;
2984 Oid relid;
2985 AttrNumber attnum;
2986
2987 relid = rt_fetch(var->varno, pstate->p_rtable)->relid;
2988 if (!OidIsValid(relid))
2989 return false; /* XXX shouldn't we raise an error? */
2990
2991 attnum = var->varattno;
2992
2993 if (attnum > 0 && get_attgenerated(relid, attnum))
2994 ereport(ERROR,
2995 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2996 errmsg("cannot use generated column \"%s\" in column generation expression",
2997 get_attname(relid, attnum, false)),
2998 errdetail("A generated column cannot reference another generated column."),
2999 parser_errposition(pstate, var->location)));
3000 /* A whole-row Var is necessarily self-referential, so forbid it */
3001 if (attnum == 0)
3002 ereport(ERROR,
3003 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3004 errmsg("cannot use whole-row variable in column generation expression"),
3005 errdetail("This would cause the generated column to depend on its own value."),
3006 parser_errposition(pstate, var->location)));
3007 /* System columns were already checked in the parser */
3008
3009 return false;
3010 }
3011 else
3012 return expression_tree_walker(node, check_nested_generated_walker,
3013 (void *) context);
3014 }
3015
3016 static void
check_nested_generated(ParseState * pstate,Node * node)3017 check_nested_generated(ParseState *pstate, Node *node)
3018 {
3019 check_nested_generated_walker(node, pstate);
3020 }
3021
3022 /*
3023 * Take a raw default and convert it to a cooked format ready for
3024 * storage.
3025 *
3026 * Parse state should be set up to recognize any vars that might appear
3027 * in the expression. (Even though we plan to reject vars, it's more
3028 * user-friendly to give the correct error message than "unknown var".)
3029 *
3030 * If atttypid is not InvalidOid, coerce the expression to the specified
3031 * type (and typmod atttypmod). attname is only needed in this case:
3032 * it is used in the error message, if any.
3033 */
3034 Node *
cookDefault(ParseState * pstate,Node * raw_default,Oid atttypid,int32 atttypmod,const char * attname,char attgenerated)3035 cookDefault(ParseState *pstate,
3036 Node *raw_default,
3037 Oid atttypid,
3038 int32 atttypmod,
3039 const char *attname,
3040 char attgenerated)
3041 {
3042 Node *expr;
3043
3044 Assert(raw_default != NULL);
3045
3046 /*
3047 * Transform raw parsetree to executable expression.
3048 */
3049 expr = transformExpr(pstate, raw_default, attgenerated ? EXPR_KIND_GENERATED_COLUMN : EXPR_KIND_COLUMN_DEFAULT);
3050
3051 if (attgenerated)
3052 {
3053 check_nested_generated(pstate, expr);
3054
3055 if (contain_mutable_functions(expr))
3056 ereport(ERROR,
3057 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3058 errmsg("generation expression is not immutable")));
3059 }
3060 else
3061 {
3062 /*
3063 * For a default expression, transformExpr() should have rejected
3064 * column references.
3065 */
3066 Assert(!contain_var_clause(expr));
3067 }
3068
3069 /*
3070 * Coerce the expression to the correct type and typmod, if given. This
3071 * should match the parser's processing of non-defaulted expressions ---
3072 * see transformAssignedExpr().
3073 */
3074 if (OidIsValid(atttypid))
3075 {
3076 Oid type_id = exprType(expr);
3077
3078 expr = coerce_to_target_type(pstate, expr, type_id,
3079 atttypid, atttypmod,
3080 COERCION_ASSIGNMENT,
3081 COERCE_IMPLICIT_CAST,
3082 -1);
3083 if (expr == NULL)
3084 ereport(ERROR,
3085 (errcode(ERRCODE_DATATYPE_MISMATCH),
3086 errmsg("column \"%s\" is of type %s"
3087 " but default expression is of type %s",
3088 attname,
3089 format_type_be(atttypid),
3090 format_type_be(type_id)),
3091 errhint("You will need to rewrite or cast the expression.")));
3092 }
3093
3094 /*
3095 * Finally, take care of collations in the finished expression.
3096 */
3097 assign_expr_collations(pstate, expr);
3098
3099 return expr;
3100 }
3101
3102 /*
3103 * Take a raw CHECK constraint expression and convert it to a cooked format
3104 * ready for storage.
3105 *
3106 * Parse state must be set up to recognize any vars that might appear
3107 * in the expression.
3108 */
3109 static Node *
cookConstraint(ParseState * pstate,Node * raw_constraint,char * relname)3110 cookConstraint(ParseState *pstate,
3111 Node *raw_constraint,
3112 char *relname)
3113 {
3114 Node *expr;
3115
3116 /*
3117 * Transform raw parsetree to executable expression.
3118 */
3119 expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
3120
3121 /*
3122 * Make sure it yields a boolean result.
3123 */
3124 expr = coerce_to_boolean(pstate, expr, "CHECK");
3125
3126 /*
3127 * Take care of collations.
3128 */
3129 assign_expr_collations(pstate, expr);
3130
3131 /*
3132 * Make sure no outside relations are referred to (this is probably dead
3133 * code now that add_missing_from is history).
3134 */
3135 if (list_length(pstate->p_rtable) != 1)
3136 ereport(ERROR,
3137 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
3138 errmsg("only table \"%s\" can be referenced in check constraint",
3139 relname)));
3140
3141 return expr;
3142 }
3143
3144 /*
3145 * CopyStatistics --- copy entries in pg_statistic from one rel to another
3146 */
3147 void
CopyStatistics(Oid fromrelid,Oid torelid)3148 CopyStatistics(Oid fromrelid, Oid torelid)
3149 {
3150 HeapTuple tup;
3151 SysScanDesc scan;
3152 ScanKeyData key[1];
3153 Relation statrel;
3154
3155 statrel = table_open(StatisticRelationId, RowExclusiveLock);
3156
3157 /* Now search for stat records */
3158 ScanKeyInit(&key[0],
3159 Anum_pg_statistic_starelid,
3160 BTEqualStrategyNumber, F_OIDEQ,
3161 ObjectIdGetDatum(fromrelid));
3162
3163 scan = systable_beginscan(statrel, StatisticRelidAttnumInhIndexId,
3164 true, NULL, 1, key);
3165
3166 while (HeapTupleIsValid((tup = systable_getnext(scan))))
3167 {
3168 Form_pg_statistic statform;
3169
3170 /* make a modifiable copy */
3171 tup = heap_copytuple(tup);
3172 statform = (Form_pg_statistic) GETSTRUCT(tup);
3173
3174 /* update the copy of the tuple and insert it */
3175 statform->starelid = torelid;
3176 CatalogTupleInsert(statrel, tup);
3177
3178 heap_freetuple(tup);
3179 }
3180
3181 systable_endscan(scan);
3182
3183 table_close(statrel, RowExclusiveLock);
3184 }
3185
3186 /*
3187 * RemoveStatistics --- remove entries in pg_statistic for a rel or column
3188 *
3189 * If attnum is zero, remove all entries for rel; else remove only the one(s)
3190 * for that column.
3191 */
3192 void
RemoveStatistics(Oid relid,AttrNumber attnum)3193 RemoveStatistics(Oid relid, AttrNumber attnum)
3194 {
3195 Relation pgstatistic;
3196 SysScanDesc scan;
3197 ScanKeyData key[2];
3198 int nkeys;
3199 HeapTuple tuple;
3200
3201 pgstatistic = table_open(StatisticRelationId, RowExclusiveLock);
3202
3203 ScanKeyInit(&key[0],
3204 Anum_pg_statistic_starelid,
3205 BTEqualStrategyNumber, F_OIDEQ,
3206 ObjectIdGetDatum(relid));
3207
3208 if (attnum == 0)
3209 nkeys = 1;
3210 else
3211 {
3212 ScanKeyInit(&key[1],
3213 Anum_pg_statistic_staattnum,
3214 BTEqualStrategyNumber, F_INT2EQ,
3215 Int16GetDatum(attnum));
3216 nkeys = 2;
3217 }
3218
3219 scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
3220 NULL, nkeys, key);
3221
3222 /* we must loop even when attnum != 0, in case of inherited stats */
3223 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
3224 CatalogTupleDelete(pgstatistic, &tuple->t_self);
3225
3226 systable_endscan(scan);
3227
3228 table_close(pgstatistic, RowExclusiveLock);
3229 }
3230
3231
3232 /*
3233 * RelationTruncateIndexes - truncate all indexes associated
3234 * with the heap relation to zero tuples.
3235 *
3236 * The routine will truncate and then reconstruct the indexes on
3237 * the specified relation. Caller must hold exclusive lock on rel.
3238 */
3239 static void
RelationTruncateIndexes(Relation heapRelation)3240 RelationTruncateIndexes(Relation heapRelation)
3241 {
3242 ListCell *indlist;
3243
3244 /* Ask the relcache to produce a list of the indexes of the rel */
3245 foreach(indlist, RelationGetIndexList(heapRelation))
3246 {
3247 Oid indexId = lfirst_oid(indlist);
3248 Relation currentIndex;
3249 IndexInfo *indexInfo;
3250
3251 /* Open the index relation; use exclusive lock, just to be sure */
3252 currentIndex = index_open(indexId, AccessExclusiveLock);
3253
3254 /*
3255 * Fetch info needed for index_build. Since we know there are no
3256 * tuples that actually need indexing, we can use a dummy IndexInfo.
3257 * This is slightly cheaper to build, but the real point is to avoid
3258 * possibly running user-defined code in index expressions or
3259 * predicates. We might be getting invoked during ON COMMIT
3260 * processing, and we don't want to run any such code then.
3261 */
3262 indexInfo = BuildDummyIndexInfo(currentIndex);
3263
3264 /*
3265 * Now truncate the actual file (and discard buffers).
3266 */
3267 RelationTruncate(currentIndex, 0);
3268
3269 /* Initialize the index and rebuild */
3270 /* Note: we do not need to re-establish pkey setting */
3271 index_build(heapRelation, currentIndex, indexInfo, true, false);
3272
3273 /* We're done with this index */
3274 index_close(currentIndex, NoLock);
3275 }
3276 }
3277
3278 /*
3279 * heap_truncate
3280 *
3281 * This routine deletes all data within all the specified relations.
3282 *
3283 * This is not transaction-safe! There is another, transaction-safe
3284 * implementation in commands/tablecmds.c. We now use this only for
3285 * ON COMMIT truncation of temporary tables, where it doesn't matter.
3286 */
3287 void
heap_truncate(List * relids)3288 heap_truncate(List *relids)
3289 {
3290 List *relations = NIL;
3291 ListCell *cell;
3292
3293 /* Open relations for processing, and grab exclusive access on each */
3294 foreach(cell, relids)
3295 {
3296 Oid rid = lfirst_oid(cell);
3297 Relation rel;
3298
3299 rel = table_open(rid, AccessExclusiveLock);
3300 relations = lappend(relations, rel);
3301 }
3302
3303 /* Don't allow truncate on tables that are referenced by foreign keys */
3304 heap_truncate_check_FKs(relations, true);
3305
3306 /* OK to do it */
3307 foreach(cell, relations)
3308 {
3309 Relation rel = lfirst(cell);
3310
3311 /* Truncate the relation */
3312 heap_truncate_one_rel(rel);
3313
3314 /* Close the relation, but keep exclusive lock on it until commit */
3315 table_close(rel, NoLock);
3316 }
3317 }
3318
3319 /*
3320 * heap_truncate_one_rel
3321 *
3322 * This routine deletes all data within the specified relation.
3323 *
3324 * This is not transaction-safe, because the truncation is done immediately
3325 * and cannot be rolled back later. Caller is responsible for having
3326 * checked permissions etc, and must have obtained AccessExclusiveLock.
3327 */
3328 void
heap_truncate_one_rel(Relation rel)3329 heap_truncate_one_rel(Relation rel)
3330 {
3331 Oid toastrelid;
3332
3333 /*
3334 * Truncate the relation. Partitioned tables have no storage, so there is
3335 * nothing to do for them here.
3336 */
3337 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3338 return;
3339
3340 /* Truncate the underlying relation */
3341 table_relation_nontransactional_truncate(rel);
3342
3343 /* If the relation has indexes, truncate the indexes too */
3344 RelationTruncateIndexes(rel);
3345
3346 /* If there is a toast table, truncate that too */
3347 toastrelid = rel->rd_rel->reltoastrelid;
3348 if (OidIsValid(toastrelid))
3349 {
3350 Relation toastrel = table_open(toastrelid, AccessExclusiveLock);
3351
3352 table_relation_nontransactional_truncate(toastrel);
3353 RelationTruncateIndexes(toastrel);
3354 /* keep the lock... */
3355 table_close(toastrel, NoLock);
3356 }
3357 }
3358
3359 /*
3360 * heap_truncate_check_FKs
3361 * Check for foreign keys referencing a list of relations that
3362 * are to be truncated, and raise error if there are any
3363 *
3364 * We disallow such FKs (except self-referential ones) since the whole point
3365 * of TRUNCATE is to not scan the individual rows to be thrown away.
3366 *
3367 * This is split out so it can be shared by both implementations of truncate.
3368 * Caller should already hold a suitable lock on the relations.
3369 *
3370 * tempTables is only used to select an appropriate error message.
3371 */
3372 void
heap_truncate_check_FKs(List * relations,bool tempTables)3373 heap_truncate_check_FKs(List *relations, bool tempTables)
3374 {
3375 List *oids = NIL;
3376 List *dependents;
3377 ListCell *cell;
3378
3379 /*
3380 * Build a list of OIDs of the interesting relations.
3381 *
3382 * If a relation has no triggers, then it can neither have FKs nor be
3383 * referenced by a FK from another table, so we can ignore it. For
3384 * partitioned tables, FKs have no triggers, so we must include them
3385 * anyway.
3386 */
3387 foreach(cell, relations)
3388 {
3389 Relation rel = lfirst(cell);
3390
3391 if (rel->rd_rel->relhastriggers ||
3392 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3393 oids = lappend_oid(oids, RelationGetRelid(rel));
3394 }
3395
3396 /*
3397 * Fast path: if no relation has triggers, none has FKs either.
3398 */
3399 if (oids == NIL)
3400 return;
3401
3402 /*
3403 * Otherwise, must scan pg_constraint. We make one pass with all the
3404 * relations considered; if this finds nothing, then all is well.
3405 */
3406 dependents = heap_truncate_find_FKs(oids);
3407 if (dependents == NIL)
3408 return;
3409
3410 /*
3411 * Otherwise we repeat the scan once per relation to identify a particular
3412 * pair of relations to complain about. This is pretty slow, but
3413 * performance shouldn't matter much in a failure path. The reason for
3414 * doing things this way is to ensure that the message produced is not
3415 * dependent on chance row locations within pg_constraint.
3416 */
3417 foreach(cell, oids)
3418 {
3419 Oid relid = lfirst_oid(cell);
3420 ListCell *cell2;
3421
3422 dependents = heap_truncate_find_FKs(list_make1_oid(relid));
3423
3424 foreach(cell2, dependents)
3425 {
3426 Oid relid2 = lfirst_oid(cell2);
3427
3428 if (!list_member_oid(oids, relid2))
3429 {
3430 char *relname = get_rel_name(relid);
3431 char *relname2 = get_rel_name(relid2);
3432
3433 if (tempTables)
3434 ereport(ERROR,
3435 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3436 errmsg("unsupported ON COMMIT and foreign key combination"),
3437 errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
3438 relname2, relname)));
3439 else
3440 ereport(ERROR,
3441 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3442 errmsg("cannot truncate a table referenced in a foreign key constraint"),
3443 errdetail("Table \"%s\" references \"%s\".",
3444 relname2, relname),
3445 errhint("Truncate table \"%s\" at the same time, "
3446 "or use TRUNCATE ... CASCADE.",
3447 relname2)));
3448 }
3449 }
3450 }
3451 }
3452
3453 /*
3454 * heap_truncate_find_FKs
3455 * Find relations having foreign keys referencing any of the given rels
3456 *
3457 * Input and result are both lists of relation OIDs. The result contains
3458 * no duplicates, does *not* include any rels that were already in the input
3459 * list, and is sorted in OID order. (The last property is enforced mainly
3460 * to guarantee consistent behavior in the regression tests; we don't want
3461 * behavior to change depending on chance locations of rows in pg_constraint.)
3462 *
3463 * Note: caller should already have appropriate lock on all rels mentioned
3464 * in relationIds. Since adding or dropping an FK requires exclusive lock
3465 * on both rels, this ensures that the answer will be stable.
3466 */
3467 List *
heap_truncate_find_FKs(List * relationIds)3468 heap_truncate_find_FKs(List *relationIds)
3469 {
3470 List *result = NIL;
3471 List *oids = list_copy(relationIds);
3472 List *parent_cons;
3473 ListCell *cell;
3474 ScanKeyData key;
3475 Relation fkeyRel;
3476 SysScanDesc fkeyScan;
3477 HeapTuple tuple;
3478 bool restart;
3479
3480 oids = list_copy(relationIds);
3481
3482 /*
3483 * Must scan pg_constraint. Right now, it is a seqscan because there is
3484 * no available index on confrelid.
3485 */
3486 fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
3487
3488 restart:
3489 restart = false;
3490 parent_cons = NIL;
3491
3492 fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
3493 NULL, 0, NULL);
3494
3495 while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
3496 {
3497 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3498
3499 /* Not a foreign key */
3500 if (con->contype != CONSTRAINT_FOREIGN)
3501 continue;
3502
3503 /* Not referencing one of our list of tables */
3504 if (!list_member_oid(oids, con->confrelid))
3505 continue;
3506
3507 /*
3508 * If this constraint has a parent constraint which we have not seen
3509 * yet, keep track of it for the second loop, below. Tracking parent
3510 * constraints allows us to climb up to the top-level level constraint
3511 * and look for all possible relations referencing the partitioned
3512 * table.
3513 */
3514 if (OidIsValid(con->conparentid) &&
3515 !list_member_oid(parent_cons, con->conparentid))
3516 parent_cons = lappend_oid(parent_cons, con->conparentid);
3517
3518 /*
3519 * Add referencer to result, unless already present in input or result
3520 * list.
3521 */
3522 if (!list_member_oid(relationIds, con->conrelid))
3523 result = insert_ordered_unique_oid(result, con->conrelid);
3524 }
3525
3526 systable_endscan(fkeyScan);
3527
3528 /*
3529 * Process each parent constraint we found to add the list of referenced
3530 * relations by them to the oids list. If we do add any new such
3531 * relations, redo the first loop above. Also, if we see that the parent
3532 * constraint in turn has a parent, add that so that we process all
3533 * relations in a single additional pass.
3534 */
3535 foreach(cell, parent_cons)
3536 {
3537 Oid parent = lfirst_oid(cell);
3538
3539 ScanKeyInit(&key,
3540 Anum_pg_constraint_oid,
3541 BTEqualStrategyNumber, F_OIDEQ,
3542 ObjectIdGetDatum(parent));
3543
3544 fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
3545 true, NULL, 1, &key);
3546
3547 tuple = systable_getnext(fkeyScan);
3548 if (HeapTupleIsValid(tuple))
3549 {
3550 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3551
3552 /*
3553 * pg_constraint rows always appear for partitioned hierarchies
3554 * this way: on the each side of the constraint, one row appears
3555 * for each partition that points to the top-most table on the
3556 * other side.
3557 *
3558 * Because of this arrangement, we can correctly catch all
3559 * relevant relations by adding to 'parent_cons' all rows with
3560 * valid conparentid, and to the 'oids' list all rows with a
3561 * zero conparentid. If any oids are added to 'oids', redo the
3562 * first loop above by setting 'restart'.
3563 */
3564 if (OidIsValid(con->conparentid))
3565 parent_cons = list_append_unique_oid(parent_cons,
3566 con->conparentid);
3567 else if (!list_member_oid(oids, con->confrelid))
3568 {
3569 oids = lappend_oid(oids, con->confrelid);
3570 restart = true;
3571 }
3572 }
3573
3574 systable_endscan(fkeyScan);
3575 }
3576
3577 list_free(parent_cons);
3578 if (restart)
3579 goto restart;
3580
3581 table_close(fkeyRel, AccessShareLock);
3582 list_free(oids);
3583
3584 return result;
3585 }
3586
3587 /*
3588 * insert_ordered_unique_oid
3589 * Insert a new Oid into a sorted list of Oids, preserving ordering,
3590 * and eliminating duplicates
3591 *
3592 * Building the ordered list this way is O(N^2), but with a pretty small
3593 * constant, so for the number of entries we expect it will probably be
3594 * faster than trying to apply qsort(). It seems unlikely someone would be
3595 * trying to truncate a table with thousands of dependent tables ...
3596 */
3597 static List *
insert_ordered_unique_oid(List * list,Oid datum)3598 insert_ordered_unique_oid(List *list, Oid datum)
3599 {
3600 ListCell *prev;
3601
3602 /* Does the datum belong at the front? */
3603 if (list == NIL || datum < linitial_oid(list))
3604 return lcons_oid(datum, list);
3605 /* Does it match the first entry? */
3606 if (datum == linitial_oid(list))
3607 return list; /* duplicate, so don't insert */
3608 /* No, so find the entry it belongs after */
3609 prev = list_head(list);
3610 for (;;)
3611 {
3612 ListCell *curr = lnext(prev);
3613
3614 if (curr == NULL || datum < lfirst_oid(curr))
3615 break; /* it belongs after 'prev', before 'curr' */
3616
3617 if (datum == lfirst_oid(curr))
3618 return list; /* duplicate, so don't insert */
3619
3620 prev = curr;
3621 }
3622 /* Insert datum into list after 'prev' */
3623 lappend_cell_oid(list, prev, datum);
3624 return list;
3625 }
3626
3627 /*
3628 * StorePartitionKey
3629 * Store information about the partition key rel into the catalog
3630 */
3631 void
StorePartitionKey(Relation rel,char strategy,int16 partnatts,AttrNumber * partattrs,List * partexprs,Oid * partopclass,Oid * partcollation)3632 StorePartitionKey(Relation rel,
3633 char strategy,
3634 int16 partnatts,
3635 AttrNumber *partattrs,
3636 List *partexprs,
3637 Oid *partopclass,
3638 Oid *partcollation)
3639 {
3640 int i;
3641 int2vector *partattrs_vec;
3642 oidvector *partopclass_vec;
3643 oidvector *partcollation_vec;
3644 Datum partexprDatum;
3645 Relation pg_partitioned_table;
3646 HeapTuple tuple;
3647 Datum values[Natts_pg_partitioned_table];
3648 bool nulls[Natts_pg_partitioned_table];
3649 ObjectAddress myself;
3650 ObjectAddress referenced;
3651
3652 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
3653
3654 /* Copy the partition attribute numbers, opclass OIDs into arrays */
3655 partattrs_vec = buildint2vector(partattrs, partnatts);
3656 partopclass_vec = buildoidvector(partopclass, partnatts);
3657 partcollation_vec = buildoidvector(partcollation, partnatts);
3658
3659 /* Convert the expressions (if any) to a text datum */
3660 if (partexprs)
3661 {
3662 char *exprString;
3663
3664 exprString = nodeToString(partexprs);
3665 partexprDatum = CStringGetTextDatum(exprString);
3666 pfree(exprString);
3667 }
3668 else
3669 partexprDatum = (Datum) 0;
3670
3671 pg_partitioned_table = table_open(PartitionedRelationId, RowExclusiveLock);
3672
3673 MemSet(nulls, false, sizeof(nulls));
3674
3675 /* Only this can ever be NULL */
3676 if (!partexprDatum)
3677 nulls[Anum_pg_partitioned_table_partexprs - 1] = true;
3678
3679 values[Anum_pg_partitioned_table_partrelid - 1] = ObjectIdGetDatum(RelationGetRelid(rel));
3680 values[Anum_pg_partitioned_table_partstrat - 1] = CharGetDatum(strategy);
3681 values[Anum_pg_partitioned_table_partnatts - 1] = Int16GetDatum(partnatts);
3682 values[Anum_pg_partitioned_table_partdefid - 1] = ObjectIdGetDatum(InvalidOid);
3683 values[Anum_pg_partitioned_table_partattrs - 1] = PointerGetDatum(partattrs_vec);
3684 values[Anum_pg_partitioned_table_partclass - 1] = PointerGetDatum(partopclass_vec);
3685 values[Anum_pg_partitioned_table_partcollation - 1] = PointerGetDatum(partcollation_vec);
3686 values[Anum_pg_partitioned_table_partexprs - 1] = partexprDatum;
3687
3688 tuple = heap_form_tuple(RelationGetDescr(pg_partitioned_table), values, nulls);
3689
3690 CatalogTupleInsert(pg_partitioned_table, tuple);
3691 table_close(pg_partitioned_table, RowExclusiveLock);
3692
3693 /* Mark this relation as dependent on a few things as follows */
3694 myself.classId = RelationRelationId;
3695 myself.objectId = RelationGetRelid(rel);
3696 myself.objectSubId = 0;
3697
3698 /* Operator class and collation per key column */
3699 for (i = 0; i < partnatts; i++)
3700 {
3701 referenced.classId = OperatorClassRelationId;
3702 referenced.objectId = partopclass[i];
3703 referenced.objectSubId = 0;
3704
3705 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3706
3707 /* The default collation is pinned, so don't bother recording it */
3708 if (OidIsValid(partcollation[i]) &&
3709 partcollation[i] != DEFAULT_COLLATION_OID)
3710 {
3711 referenced.classId = CollationRelationId;
3712 referenced.objectId = partcollation[i];
3713 referenced.objectSubId = 0;
3714
3715 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3716 }
3717 }
3718
3719 /*
3720 * The partitioning columns are made internally dependent on the table,
3721 * because we cannot drop any of them without dropping the whole table.
3722 * (ATExecDropColumn independently enforces that, but it's not bulletproof
3723 * so we need the dependencies too.)
3724 */
3725 for (i = 0; i < partnatts; i++)
3726 {
3727 if (partattrs[i] == 0)
3728 continue; /* ignore expressions here */
3729
3730 referenced.classId = RelationRelationId;
3731 referenced.objectId = RelationGetRelid(rel);
3732 referenced.objectSubId = partattrs[i];
3733
3734 recordDependencyOn(&referenced, &myself, DEPENDENCY_INTERNAL);
3735 }
3736
3737 /*
3738 * Also consider anything mentioned in partition expressions. External
3739 * references (e.g. functions) get NORMAL dependencies. Table columns
3740 * mentioned in the expressions are handled the same as plain partitioning
3741 * columns, i.e. they become internally dependent on the whole table.
3742 */
3743 if (partexprs)
3744 recordDependencyOnSingleRelExpr(&myself,
3745 (Node *) partexprs,
3746 RelationGetRelid(rel),
3747 DEPENDENCY_NORMAL,
3748 DEPENDENCY_INTERNAL,
3749 true /* reverse the self-deps */ );
3750
3751 /*
3752 * We must invalidate the relcache so that the next
3753 * CommandCounterIncrement() will cause the same to be rebuilt using the
3754 * information in just created catalog entry.
3755 */
3756 CacheInvalidateRelcache(rel);
3757 }
3758
3759 /*
3760 * RemovePartitionKeyByRelId
3761 * Remove pg_partitioned_table entry for a relation
3762 */
3763 void
RemovePartitionKeyByRelId(Oid relid)3764 RemovePartitionKeyByRelId(Oid relid)
3765 {
3766 Relation rel;
3767 HeapTuple tuple;
3768
3769 rel = table_open(PartitionedRelationId, RowExclusiveLock);
3770
3771 tuple = SearchSysCache1(PARTRELID, ObjectIdGetDatum(relid));
3772 if (!HeapTupleIsValid(tuple))
3773 elog(ERROR, "cache lookup failed for partition key of relation %u",
3774 relid);
3775
3776 CatalogTupleDelete(rel, &tuple->t_self);
3777
3778 ReleaseSysCache(tuple);
3779 table_close(rel, RowExclusiveLock);
3780 }
3781
3782 /*
3783 * StorePartitionBound
3784 * Update pg_class tuple of rel to store the partition bound and set
3785 * relispartition to true
3786 *
3787 * If this is the default partition, also update the default partition OID in
3788 * pg_partitioned_table.
3789 *
3790 * Also, invalidate the parent's relcache, so that the next rebuild will load
3791 * the new partition's info into its partition descriptor. If there is a
3792 * default partition, we must invalidate its relcache entry as well.
3793 */
3794 void
StorePartitionBound(Relation rel,Relation parent,PartitionBoundSpec * bound)3795 StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
3796 {
3797 Relation classRel;
3798 HeapTuple tuple,
3799 newtuple;
3800 Datum new_val[Natts_pg_class];
3801 bool new_null[Natts_pg_class],
3802 new_repl[Natts_pg_class];
3803 Oid defaultPartOid;
3804
3805 /* Update pg_class tuple */
3806 classRel = table_open(RelationRelationId, RowExclusiveLock);
3807 tuple = SearchSysCacheCopy1(RELOID,
3808 ObjectIdGetDatum(RelationGetRelid(rel)));
3809 if (!HeapTupleIsValid(tuple))
3810 elog(ERROR, "cache lookup failed for relation %u",
3811 RelationGetRelid(rel));
3812
3813 #ifdef USE_ASSERT_CHECKING
3814 {
3815 Form_pg_class classForm;
3816 bool isnull;
3817
3818 classForm = (Form_pg_class) GETSTRUCT(tuple);
3819 Assert(!classForm->relispartition);
3820 (void) SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relpartbound,
3821 &isnull);
3822 Assert(isnull);
3823 }
3824 #endif
3825
3826 /* Fill in relpartbound value */
3827 memset(new_val, 0, sizeof(new_val));
3828 memset(new_null, false, sizeof(new_null));
3829 memset(new_repl, false, sizeof(new_repl));
3830 new_val[Anum_pg_class_relpartbound - 1] = CStringGetTextDatum(nodeToString(bound));
3831 new_null[Anum_pg_class_relpartbound - 1] = false;
3832 new_repl[Anum_pg_class_relpartbound - 1] = true;
3833 newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
3834 new_val, new_null, new_repl);
3835 /* Also set the flag */
3836 ((Form_pg_class) GETSTRUCT(newtuple))->relispartition = true;
3837 CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
3838 heap_freetuple(newtuple);
3839 table_close(classRel, RowExclusiveLock);
3840
3841 /*
3842 * If we're storing bounds for the default partition, update
3843 * pg_partitioned_table too.
3844 */
3845 if (bound->is_default)
3846 update_default_partition_oid(RelationGetRelid(parent),
3847 RelationGetRelid(rel));
3848
3849 /* Make these updates visible */
3850 CommandCounterIncrement();
3851
3852 /*
3853 * The partition constraint for the default partition depends on the
3854 * partition bounds of every other partition, so we must invalidate the
3855 * relcache entry for that partition every time a partition is added or
3856 * removed.
3857 */
3858 defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(parent));
3859 if (OidIsValid(defaultPartOid))
3860 CacheInvalidateRelcacheByRelid(defaultPartOid);
3861
3862 CacheInvalidateRelcache(parent);
3863 }
3864