1 /*-------------------------------------------------------------------------
2 *
3 * heap.c
4 * code to create and destroy POSTGRES heap relations
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/catalog/heap.c
12 *
13 *
14 * INTERFACE ROUTINES
15 * heap_create() - Create an uncataloged heap relation
16 * heap_create_with_catalog() - Create a cataloged relation
17 * heap_drop_with_catalog() - Removes named relation from catalogs
18 *
19 * NOTES
20 * this code taken from access/heap/create.c, which contains
21 * the old heap_create_with_catalog, amcreate, and amdestroy.
22 * those routines will soon call these routines using the function
23 * manager,
24 * just like the poorly named "NewXXX" routines do. The
25 * "New" routines are all going to die soon, once and for all!
26 * -cim 1/13/91
27 *
28 *-------------------------------------------------------------------------
29 */
30 #include "postgres.h"
31
32 #include "access/genam.h"
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/relation.h"
36 #include "access/sysattr.h"
37 #include "access/table.h"
38 #include "access/tableam.h"
39 #include "access/toast_compression.h"
40 #include "access/transam.h"
41 #include "access/xact.h"
42 #include "access/xlog.h"
43 #include "catalog/binary_upgrade.h"
44 #include "catalog/catalog.h"
45 #include "catalog/dependency.h"
46 #include "catalog/heap.h"
47 #include "catalog/index.h"
48 #include "catalog/objectaccess.h"
49 #include "catalog/partition.h"
50 #include "catalog/pg_am.h"
51 #include "catalog/pg_attrdef.h"
52 #include "catalog/pg_collation.h"
53 #include "catalog/pg_constraint.h"
54 #include "catalog/pg_foreign_table.h"
55 #include "catalog/pg_inherits.h"
56 #include "catalog/pg_namespace.h"
57 #include "catalog/pg_opclass.h"
58 #include "catalog/pg_partitioned_table.h"
59 #include "catalog/pg_statistic.h"
60 #include "catalog/pg_subscription_rel.h"
61 #include "catalog/pg_tablespace.h"
62 #include "catalog/pg_type.h"
63 #include "catalog/storage.h"
64 #include "catalog/storage_xlog.h"
65 #include "commands/tablecmds.h"
66 #include "commands/typecmds.h"
67 #include "executor/executor.h"
68 #include "miscadmin.h"
69 #include "nodes/nodeFuncs.h"
70 #include "optimizer/optimizer.h"
71 #include "parser/parse_coerce.h"
72 #include "parser/parse_collate.h"
73 #include "parser/parse_expr.h"
74 #include "parser/parse_relation.h"
75 #include "parser/parsetree.h"
76 #include "partitioning/partdesc.h"
77 #include "storage/lmgr.h"
78 #include "storage/predicate.h"
79 #include "storage/smgr.h"
80 #include "utils/acl.h"
81 #include "utils/builtins.h"
82 #include "utils/datum.h"
83 #include "utils/fmgroids.h"
84 #include "utils/inval.h"
85 #include "utils/lsyscache.h"
86 #include "utils/partcache.h"
87 #include "utils/ruleutils.h"
88 #include "utils/snapmgr.h"
89 #include "utils/syscache.h"
90
91
92 /* Potentially set by pg_upgrade_support functions */
93 Oid binary_upgrade_next_heap_pg_class_oid = InvalidOid;
94 Oid binary_upgrade_next_toast_pg_class_oid = InvalidOid;
95
96 static void AddNewRelationTuple(Relation pg_class_desc,
97 Relation new_rel_desc,
98 Oid new_rel_oid,
99 Oid new_type_oid,
100 Oid reloftype,
101 Oid relowner,
102 char relkind,
103 TransactionId relfrozenxid,
104 TransactionId relminmxid,
105 Datum relacl,
106 Datum reloptions);
107 static ObjectAddress AddNewRelationType(const char *typeName,
108 Oid typeNamespace,
109 Oid new_rel_oid,
110 char new_rel_kind,
111 Oid ownerid,
112 Oid new_row_type,
113 Oid new_array_type);
114 static void RelationRemoveInheritance(Oid relid);
115 static Oid StoreRelCheck(Relation rel, const char *ccname, Node *expr,
116 bool is_validated, bool is_local, int inhcount,
117 bool is_no_inherit, bool is_internal);
118 static void StoreConstraints(Relation rel, List *cooked_constraints,
119 bool is_internal);
120 static bool MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
121 bool allow_merge, bool is_local,
122 bool is_initially_valid,
123 bool is_no_inherit);
124 static void SetRelationNumChecks(Relation rel, int numchecks);
125 static Node *cookConstraint(ParseState *pstate,
126 Node *raw_constraint,
127 char *relname);
128
129
130 /* ----------------------------------------------------------------
131 * XXX UGLY HARD CODED BADNESS FOLLOWS XXX
132 *
133 * these should all be moved to someplace in the lib/catalog
134 * module, if not obliterated first.
135 * ----------------------------------------------------------------
136 */
137
138
139 /*
140 * Note:
141 * Should the system special case these attributes in the future?
142 * Advantage: consume much less space in the ATTRIBUTE relation.
143 * Disadvantage: special cases will be all over the place.
144 */
145
146 /*
147 * The initializers below do not include trailing variable length fields,
148 * but that's OK - we're never going to reference anything beyond the
149 * fixed-size portion of the structure anyway. Fields that can default
150 * to zeroes are also not mentioned.
151 */
152
153 static const FormData_pg_attribute a1 = {
154 .attname = {"ctid"},
155 .atttypid = TIDOID,
156 .attlen = sizeof(ItemPointerData),
157 .attnum = SelfItemPointerAttributeNumber,
158 .attcacheoff = -1,
159 .atttypmod = -1,
160 .attbyval = false,
161 .attalign = TYPALIGN_SHORT,
162 .attstorage = TYPSTORAGE_PLAIN,
163 .attnotnull = true,
164 .attislocal = true,
165 };
166
167 static const FormData_pg_attribute a2 = {
168 .attname = {"xmin"},
169 .atttypid = XIDOID,
170 .attlen = sizeof(TransactionId),
171 .attnum = MinTransactionIdAttributeNumber,
172 .attcacheoff = -1,
173 .atttypmod = -1,
174 .attbyval = true,
175 .attalign = TYPALIGN_INT,
176 .attstorage = TYPSTORAGE_PLAIN,
177 .attnotnull = true,
178 .attislocal = true,
179 };
180
181 static const FormData_pg_attribute a3 = {
182 .attname = {"cmin"},
183 .atttypid = CIDOID,
184 .attlen = sizeof(CommandId),
185 .attnum = MinCommandIdAttributeNumber,
186 .attcacheoff = -1,
187 .atttypmod = -1,
188 .attbyval = true,
189 .attalign = TYPALIGN_INT,
190 .attstorage = TYPSTORAGE_PLAIN,
191 .attnotnull = true,
192 .attislocal = true,
193 };
194
195 static const FormData_pg_attribute a4 = {
196 .attname = {"xmax"},
197 .atttypid = XIDOID,
198 .attlen = sizeof(TransactionId),
199 .attnum = MaxTransactionIdAttributeNumber,
200 .attcacheoff = -1,
201 .atttypmod = -1,
202 .attbyval = true,
203 .attalign = TYPALIGN_INT,
204 .attstorage = TYPSTORAGE_PLAIN,
205 .attnotnull = true,
206 .attislocal = true,
207 };
208
209 static const FormData_pg_attribute a5 = {
210 .attname = {"cmax"},
211 .atttypid = CIDOID,
212 .attlen = sizeof(CommandId),
213 .attnum = MaxCommandIdAttributeNumber,
214 .attcacheoff = -1,
215 .atttypmod = -1,
216 .attbyval = true,
217 .attalign = TYPALIGN_INT,
218 .attstorage = TYPSTORAGE_PLAIN,
219 .attnotnull = true,
220 .attislocal = true,
221 };
222
223 /*
224 * We decided to call this attribute "tableoid" rather than say
225 * "classoid" on the basis that in the future there may be more than one
226 * table of a particular class/type. In any case table is still the word
227 * used in SQL.
228 */
229 static const FormData_pg_attribute a6 = {
230 .attname = {"tableoid"},
231 .atttypid = OIDOID,
232 .attlen = sizeof(Oid),
233 .attnum = TableOidAttributeNumber,
234 .attcacheoff = -1,
235 .atttypmod = -1,
236 .attbyval = true,
237 .attalign = TYPALIGN_INT,
238 .attstorage = TYPSTORAGE_PLAIN,
239 .attnotnull = true,
240 .attislocal = true,
241 };
242
243 static const FormData_pg_attribute *SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6};
244
245 /*
246 * This function returns a Form_pg_attribute pointer for a system attribute.
247 * Note that we elog if the presented attno is invalid, which would only
248 * happen if there's a problem upstream.
249 */
250 const FormData_pg_attribute *
SystemAttributeDefinition(AttrNumber attno)251 SystemAttributeDefinition(AttrNumber attno)
252 {
253 if (attno >= 0 || attno < -(int) lengthof(SysAtt))
254 elog(ERROR, "invalid system attribute number %d", attno);
255 return SysAtt[-attno - 1];
256 }
257
258 /*
259 * If the given name is a system attribute name, return a Form_pg_attribute
260 * pointer for a prototype definition. If not, return NULL.
261 */
262 const FormData_pg_attribute *
SystemAttributeByName(const char * attname)263 SystemAttributeByName(const char *attname)
264 {
265 int j;
266
267 for (j = 0; j < (int) lengthof(SysAtt); j++)
268 {
269 const FormData_pg_attribute *att = SysAtt[j];
270
271 if (strcmp(NameStr(att->attname), attname) == 0)
272 return att;
273 }
274
275 return NULL;
276 }
277
278
279 /* ----------------------------------------------------------------
280 * XXX END OF UGLY HARD CODED BADNESS XXX
281 * ---------------------------------------------------------------- */
282
283
284 /* ----------------------------------------------------------------
285 * heap_create - Create an uncataloged heap relation
286 *
287 * Note API change: the caller must now always provide the OID
288 * to use for the relation. The relfilenode may (and, normally,
289 * should) be left unspecified.
290 *
291 * rel->rd_rel is initialized by RelationBuildLocalRelation,
292 * and is mostly zeroes at return.
293 * ----------------------------------------------------------------
294 */
295 Relation
heap_create(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid relfilenode,Oid accessmtd,TupleDesc tupDesc,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,bool allow_system_table_mods,TransactionId * relfrozenxid,MultiXactId * relminmxid)296 heap_create(const char *relname,
297 Oid relnamespace,
298 Oid reltablespace,
299 Oid relid,
300 Oid relfilenode,
301 Oid accessmtd,
302 TupleDesc tupDesc,
303 char relkind,
304 char relpersistence,
305 bool shared_relation,
306 bool mapped_relation,
307 bool allow_system_table_mods,
308 TransactionId *relfrozenxid,
309 MultiXactId *relminmxid)
310 {
311 bool create_storage;
312 Relation rel;
313
314 /* The caller must have provided an OID for the relation. */
315 Assert(OidIsValid(relid));
316
317 /*
318 * Don't allow creating relations in pg_catalog directly, even though it
319 * is allowed to move user defined relations there. Semantics with search
320 * paths including pg_catalog are too confusing for now.
321 *
322 * But allow creating indexes on relations in pg_catalog even if
323 * allow_system_table_mods = off, upper layers already guarantee it's on a
324 * user defined relation, not a system one.
325 */
326 if (!allow_system_table_mods &&
327 ((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
328 IsToastNamespace(relnamespace)) &&
329 IsNormalProcessingMode())
330 ereport(ERROR,
331 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
332 errmsg("permission denied to create \"%s.%s\"",
333 get_namespace_name(relnamespace), relname),
334 errdetail("System catalog modifications are currently disallowed.")));
335
336 *relfrozenxid = InvalidTransactionId;
337 *relminmxid = InvalidMultiXactId;
338
339 /* Handle reltablespace for specific relkinds. */
340 switch (relkind)
341 {
342 case RELKIND_VIEW:
343 case RELKIND_COMPOSITE_TYPE:
344 case RELKIND_FOREIGN_TABLE:
345
346 /*
347 * Force reltablespace to zero if the relation has no physical
348 * storage. This is mainly just for cleanliness' sake.
349 *
350 * Partitioned tables and indexes don't have physical storage
351 * either, but we want to keep their tablespace settings so that
352 * their children can inherit it.
353 */
354 reltablespace = InvalidOid;
355 break;
356
357 case RELKIND_SEQUENCE:
358
359 /*
360 * Force reltablespace to zero for sequences, since we don't
361 * support moving them around into different tablespaces.
362 */
363 reltablespace = InvalidOid;
364 break;
365 default:
366 break;
367 }
368
369 /*
370 * Decide whether to create storage. If caller passed a valid relfilenode,
371 * storage is already created, so don't do it here. Also don't create it
372 * for relkinds without physical storage.
373 */
374 if (!RELKIND_HAS_STORAGE(relkind) || OidIsValid(relfilenode))
375 create_storage = false;
376 else
377 {
378 create_storage = true;
379 relfilenode = relid;
380 }
381
382 /*
383 * Never allow a pg_class entry to explicitly specify the database's
384 * default tablespace in reltablespace; force it to zero instead. This
385 * ensures that if the database is cloned with a different default
386 * tablespace, the pg_class entry will still match where CREATE DATABASE
387 * will put the physically copied relation.
388 *
389 * Yes, this is a bit of a hack.
390 */
391 if (reltablespace == MyDatabaseTableSpace)
392 reltablespace = InvalidOid;
393
394 /*
395 * build the relcache entry.
396 */
397 rel = RelationBuildLocalRelation(relname,
398 relnamespace,
399 tupDesc,
400 relid,
401 accessmtd,
402 relfilenode,
403 reltablespace,
404 shared_relation,
405 mapped_relation,
406 relpersistence,
407 relkind);
408
409 /*
410 * Have the storage manager create the relation's disk file, if needed.
411 *
412 * For relations the callback creates both the main and the init fork, for
413 * indexes only the main fork is created. The other forks will be created
414 * on demand.
415 */
416 if (create_storage)
417 {
418 RelationOpenSmgr(rel);
419
420 switch (rel->rd_rel->relkind)
421 {
422 case RELKIND_VIEW:
423 case RELKIND_COMPOSITE_TYPE:
424 case RELKIND_FOREIGN_TABLE:
425 case RELKIND_PARTITIONED_TABLE:
426 case RELKIND_PARTITIONED_INDEX:
427 Assert(false);
428 break;
429
430 case RELKIND_INDEX:
431 case RELKIND_SEQUENCE:
432 RelationCreateStorage(rel->rd_node, relpersistence);
433 break;
434
435 case RELKIND_RELATION:
436 case RELKIND_TOASTVALUE:
437 case RELKIND_MATVIEW:
438 table_relation_set_new_filenode(rel, &rel->rd_node,
439 relpersistence,
440 relfrozenxid, relminmxid);
441 break;
442 }
443 }
444
445 /*
446 * If a tablespace is specified, removal of that tablespace is normally
447 * protected by the existence of a physical file; but for relations with
448 * no files, add a pg_shdepend entry to account for that.
449 */
450 if (!create_storage && reltablespace != InvalidOid)
451 recordDependencyOnTablespace(RelationRelationId, relid,
452 reltablespace);
453
454 return rel;
455 }
456
457 /* ----------------------------------------------------------------
458 * heap_create_with_catalog - Create a cataloged relation
459 *
460 * this is done in multiple steps:
461 *
462 * 1) CheckAttributeNamesTypes() is used to make certain the tuple
463 * descriptor contains a valid set of attribute names and types
464 *
465 * 2) pg_class is opened and get_relname_relid()
466 * performs a scan to ensure that no relation with the
467 * same name already exists.
468 *
469 * 3) heap_create() is called to create the new relation on disk.
470 *
471 * 4) TypeCreate() is called to define a new type corresponding
472 * to the new relation.
473 *
474 * 5) AddNewRelationTuple() is called to register the
475 * relation in pg_class.
476 *
477 * 6) AddNewAttributeTuples() is called to register the
478 * new relation's schema in pg_attribute.
479 *
480 * 7) StoreConstraints is called () - vadim 08/22/97
481 *
482 * 8) the relations are closed and the new relation's oid
483 * is returned.
484 *
485 * ----------------------------------------------------------------
486 */
487
488 /* --------------------------------
489 * CheckAttributeNamesTypes
490 *
491 * this is used to make certain the tuple descriptor contains a
492 * valid set of attribute names and datatypes. a problem simply
493 * generates ereport(ERROR) which aborts the current transaction.
494 *
495 * relkind is the relkind of the relation to be created.
496 * flags controls which datatypes are allowed, cf CheckAttributeType.
497 * --------------------------------
498 */
499 void
CheckAttributeNamesTypes(TupleDesc tupdesc,char relkind,int flags)500 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
501 int flags)
502 {
503 int i;
504 int j;
505 int natts = tupdesc->natts;
506
507 /* Sanity check on column count */
508 if (natts < 0 || natts > MaxHeapAttributeNumber)
509 ereport(ERROR,
510 (errcode(ERRCODE_TOO_MANY_COLUMNS),
511 errmsg("tables can have at most %d columns",
512 MaxHeapAttributeNumber)));
513
514 /*
515 * first check for collision with system attribute names
516 *
517 * Skip this for a view or type relation, since those don't have system
518 * attributes.
519 */
520 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
521 {
522 for (i = 0; i < natts; i++)
523 {
524 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
525
526 if (SystemAttributeByName(NameStr(attr->attname)) != NULL)
527 ereport(ERROR,
528 (errcode(ERRCODE_DUPLICATE_COLUMN),
529 errmsg("column name \"%s\" conflicts with a system column name",
530 NameStr(attr->attname))));
531 }
532 }
533
534 /*
535 * next check for repeated attribute names
536 */
537 for (i = 1; i < natts; i++)
538 {
539 for (j = 0; j < i; j++)
540 {
541 if (strcmp(NameStr(TupleDescAttr(tupdesc, j)->attname),
542 NameStr(TupleDescAttr(tupdesc, i)->attname)) == 0)
543 ereport(ERROR,
544 (errcode(ERRCODE_DUPLICATE_COLUMN),
545 errmsg("column name \"%s\" specified more than once",
546 NameStr(TupleDescAttr(tupdesc, j)->attname))));
547 }
548 }
549
550 /*
551 * next check the attribute types
552 */
553 for (i = 0; i < natts; i++)
554 {
555 CheckAttributeType(NameStr(TupleDescAttr(tupdesc, i)->attname),
556 TupleDescAttr(tupdesc, i)->atttypid,
557 TupleDescAttr(tupdesc, i)->attcollation,
558 NIL, /* assume we're creating a new rowtype */
559 flags);
560 }
561 }
562
563 /* --------------------------------
564 * CheckAttributeType
565 *
566 * Verify that the proposed datatype of an attribute is legal.
567 * This is needed mainly because there are types (and pseudo-types)
568 * in the catalogs that we do not support as elements of real tuples.
569 * We also check some other properties required of a table column.
570 *
571 * If the attribute is being proposed for addition to an existing table or
572 * composite type, pass a one-element list of the rowtype OID as
573 * containing_rowtypes. When checking a to-be-created rowtype, it's
574 * sufficient to pass NIL, because there could not be any recursive reference
575 * to a not-yet-existing rowtype.
576 *
577 * flags is a bitmask controlling which datatypes we allow. For the most
578 * part, pseudo-types are disallowed as attribute types, but there are some
579 * exceptions: ANYARRAYOID, RECORDOID, and RECORDARRAYOID can be allowed
580 * in some cases. (This works because values of those type classes are
581 * self-identifying to some extent. However, RECORDOID and RECORDARRAYOID
582 * are reliably identifiable only within a session, since the identity info
583 * may use a typmod that is only locally assigned. The caller is expected
584 * to know whether these cases are safe.)
585 *
586 * flags can also control the phrasing of the error messages. If
587 * CHKATYPE_IS_PARTKEY is specified, "attname" should be a partition key
588 * column number as text, not a real column name.
589 * --------------------------------
590 */
591 void
CheckAttributeType(const char * attname,Oid atttypid,Oid attcollation,List * containing_rowtypes,int flags)592 CheckAttributeType(const char *attname,
593 Oid atttypid, Oid attcollation,
594 List *containing_rowtypes,
595 int flags)
596 {
597 char att_typtype = get_typtype(atttypid);
598 Oid att_typelem;
599
600 if (att_typtype == TYPTYPE_PSEUDO)
601 {
602 /*
603 * We disallow pseudo-type columns, with the exception of ANYARRAY,
604 * RECORD, and RECORD[] when the caller says that those are OK.
605 *
606 * We don't need to worry about recursive containment for RECORD and
607 * RECORD[] because (a) no named composite type should be allowed to
608 * contain those, and (b) two "anonymous" record types couldn't be
609 * considered to be the same type, so infinite recursion isn't
610 * possible.
611 */
612 if (!((atttypid == ANYARRAYOID && (flags & CHKATYPE_ANYARRAY)) ||
613 (atttypid == RECORDOID && (flags & CHKATYPE_ANYRECORD)) ||
614 (atttypid == RECORDARRAYOID && (flags & CHKATYPE_ANYRECORD))))
615 {
616 if (flags & CHKATYPE_IS_PARTKEY)
617 ereport(ERROR,
618 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
619 /* translator: first %s is an integer not a name */
620 errmsg("partition key column %s has pseudo-type %s",
621 attname, format_type_be(atttypid))));
622 else
623 ereport(ERROR,
624 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
625 errmsg("column \"%s\" has pseudo-type %s",
626 attname, format_type_be(atttypid))));
627 }
628 }
629 else if (att_typtype == TYPTYPE_DOMAIN)
630 {
631 /*
632 * If it's a domain, recurse to check its base type.
633 */
634 CheckAttributeType(attname, getBaseType(atttypid), attcollation,
635 containing_rowtypes,
636 flags);
637 }
638 else if (att_typtype == TYPTYPE_COMPOSITE)
639 {
640 /*
641 * For a composite type, recurse into its attributes.
642 */
643 Relation relation;
644 TupleDesc tupdesc;
645 int i;
646
647 /*
648 * Check for self-containment. Eventually we might be able to allow
649 * this (just return without complaint, if so) but it's not clear how
650 * many other places would require anti-recursion defenses before it
651 * would be safe to allow tables to contain their own rowtype.
652 */
653 if (list_member_oid(containing_rowtypes, atttypid))
654 ereport(ERROR,
655 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
656 errmsg("composite type %s cannot be made a member of itself",
657 format_type_be(atttypid))));
658
659 containing_rowtypes = lappend_oid(containing_rowtypes, atttypid);
660
661 relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
662
663 tupdesc = RelationGetDescr(relation);
664
665 for (i = 0; i < tupdesc->natts; i++)
666 {
667 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
668
669 if (attr->attisdropped)
670 continue;
671 CheckAttributeType(NameStr(attr->attname),
672 attr->atttypid, attr->attcollation,
673 containing_rowtypes,
674 flags & ~CHKATYPE_IS_PARTKEY);
675 }
676
677 relation_close(relation, AccessShareLock);
678
679 containing_rowtypes = list_delete_last(containing_rowtypes);
680 }
681 else if (att_typtype == TYPTYPE_RANGE)
682 {
683 /*
684 * If it's a range, recurse to check its subtype.
685 */
686 CheckAttributeType(attname, get_range_subtype(atttypid),
687 get_range_collation(atttypid),
688 containing_rowtypes,
689 flags);
690 }
691 else if (OidIsValid((att_typelem = get_element_type(atttypid))))
692 {
693 /*
694 * Must recurse into array types, too, in case they are composite.
695 */
696 CheckAttributeType(attname, att_typelem, attcollation,
697 containing_rowtypes,
698 flags);
699 }
700
701 /*
702 * This might not be strictly invalid per SQL standard, but it is pretty
703 * useless, and it cannot be dumped, so we must disallow it.
704 */
705 if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
706 {
707 if (flags & CHKATYPE_IS_PARTKEY)
708 ereport(ERROR,
709 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
710 /* translator: first %s is an integer not a name */
711 errmsg("no collation was derived for partition key column %s with collatable type %s",
712 attname, format_type_be(atttypid)),
713 errhint("Use the COLLATE clause to set the collation explicitly.")));
714 else
715 ereport(ERROR,
716 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
717 errmsg("no collation was derived for column \"%s\" with collatable type %s",
718 attname, format_type_be(atttypid)),
719 errhint("Use the COLLATE clause to set the collation explicitly.")));
720 }
721 }
722
723 /*
724 * InsertPgAttributeTuples
725 * Construct and insert a set of tuples in pg_attribute.
726 *
727 * Caller has already opened and locked pg_attribute. tupdesc contains the
728 * attributes to insert. attcacheoff is always initialized to -1, attacl,
729 * attfdwoptions and attmissingval are always initialized to NULL. attoptions
730 * must contain the same number of elements as tupdesc, or be NULL.
731 *
732 * indstate is the index state for CatalogTupleInsertWithInfo. It can be
733 * passed as NULL, in which case we'll fetch the necessary info. (Don't do
734 * this when inserting multiple attributes, because it's a tad more
735 * expensive.)
736 *
737 * new_rel_oid is the relation OID assigned to the attributes inserted.
738 * If set to InvalidOid, the relation OID from tupdesc is used instead.
739 */
740 void
InsertPgAttributeTuples(Relation pg_attribute_rel,TupleDesc tupdesc,Oid new_rel_oid,Datum * attoptions,CatalogIndexState indstate)741 InsertPgAttributeTuples(Relation pg_attribute_rel,
742 TupleDesc tupdesc,
743 Oid new_rel_oid,
744 Datum *attoptions,
745 CatalogIndexState indstate)
746 {
747 TupleTableSlot **slot;
748 TupleDesc td;
749 int nslots;
750 int natts = 0;
751 int slotCount = 0;
752 bool close_index = false;
753
754 td = RelationGetDescr(pg_attribute_rel);
755
756 /* Initialize the number of slots to use */
757 nslots = Min(tupdesc->natts,
758 (MAX_CATALOG_MULTI_INSERT_BYTES / sizeof(FormData_pg_attribute)));
759 slot = palloc(sizeof(TupleTableSlot *) * nslots);
760 for (int i = 0; i < nslots; i++)
761 slot[i] = MakeSingleTupleTableSlot(td, &TTSOpsHeapTuple);
762
763 while (natts < tupdesc->natts)
764 {
765 Form_pg_attribute attrs = TupleDescAttr(tupdesc, natts);
766
767 ExecClearTuple(slot[slotCount]);
768
769 memset(slot[slotCount]->tts_isnull, false,
770 slot[slotCount]->tts_tupleDescriptor->natts * sizeof(bool));
771
772 if (new_rel_oid != InvalidOid)
773 slot[slotCount]->tts_values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_rel_oid);
774 else
775 slot[slotCount]->tts_values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(attrs->attrelid);
776
777 slot[slotCount]->tts_values[Anum_pg_attribute_attname - 1] = NameGetDatum(&attrs->attname);
778 slot[slotCount]->tts_values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(attrs->atttypid);
779 slot[slotCount]->tts_values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(attrs->attstattarget);
780 slot[slotCount]->tts_values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(attrs->attlen);
781 slot[slotCount]->tts_values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(attrs->attnum);
782 slot[slotCount]->tts_values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(attrs->attndims);
783 slot[slotCount]->tts_values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(-1);
784 slot[slotCount]->tts_values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(attrs->atttypmod);
785 slot[slotCount]->tts_values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(attrs->attbyval);
786 slot[slotCount]->tts_values[Anum_pg_attribute_attalign - 1] = CharGetDatum(attrs->attalign);
787 slot[slotCount]->tts_values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(attrs->attstorage);
788 slot[slotCount]->tts_values[Anum_pg_attribute_attcompression - 1] = CharGetDatum(attrs->attcompression);
789 slot[slotCount]->tts_values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(attrs->attnotnull);
790 slot[slotCount]->tts_values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(attrs->atthasdef);
791 slot[slotCount]->tts_values[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(attrs->atthasmissing);
792 slot[slotCount]->tts_values[Anum_pg_attribute_attidentity - 1] = CharGetDatum(attrs->attidentity);
793 slot[slotCount]->tts_values[Anum_pg_attribute_attgenerated - 1] = CharGetDatum(attrs->attgenerated);
794 slot[slotCount]->tts_values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(attrs->attisdropped);
795 slot[slotCount]->tts_values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(attrs->attislocal);
796 slot[slotCount]->tts_values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(attrs->attinhcount);
797 slot[slotCount]->tts_values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(attrs->attcollation);
798 if (attoptions && attoptions[natts] != (Datum) 0)
799 slot[slotCount]->tts_values[Anum_pg_attribute_attoptions - 1] = attoptions[natts];
800 else
801 slot[slotCount]->tts_isnull[Anum_pg_attribute_attoptions - 1] = true;
802
803 /* start out with empty permissions and empty options */
804 slot[slotCount]->tts_isnull[Anum_pg_attribute_attacl - 1] = true;
805 slot[slotCount]->tts_isnull[Anum_pg_attribute_attfdwoptions - 1] = true;
806 slot[slotCount]->tts_isnull[Anum_pg_attribute_attmissingval - 1] = true;
807
808 ExecStoreVirtualTuple(slot[slotCount]);
809 slotCount++;
810
811 /*
812 * If slots are full or the end of processing has been reached, insert
813 * a batch of tuples.
814 */
815 if (slotCount == nslots || natts == tupdesc->natts - 1)
816 {
817 /* fetch index info only when we know we need it */
818 if (!indstate)
819 {
820 indstate = CatalogOpenIndexes(pg_attribute_rel);
821 close_index = true;
822 }
823
824 /* insert the new tuples and update the indexes */
825 CatalogTuplesMultiInsertWithInfo(pg_attribute_rel, slot, slotCount,
826 indstate);
827 slotCount = 0;
828 }
829
830 natts++;
831 }
832
833 if (close_index)
834 CatalogCloseIndexes(indstate);
835 for (int i = 0; i < nslots; i++)
836 ExecDropSingleTupleTableSlot(slot[i]);
837 pfree(slot);
838 }
839
840 /* --------------------------------
841 * AddNewAttributeTuples
842 *
843 * this registers the new relation's schema by adding
844 * tuples to pg_attribute.
845 * --------------------------------
846 */
847 static void
AddNewAttributeTuples(Oid new_rel_oid,TupleDesc tupdesc,char relkind)848 AddNewAttributeTuples(Oid new_rel_oid,
849 TupleDesc tupdesc,
850 char relkind)
851 {
852 Relation rel;
853 CatalogIndexState indstate;
854 int natts = tupdesc->natts;
855 ObjectAddress myself,
856 referenced;
857
858 /*
859 * open pg_attribute and its indexes.
860 */
861 rel = table_open(AttributeRelationId, RowExclusiveLock);
862
863 indstate = CatalogOpenIndexes(rel);
864
865 /* set stats detail level to a sane default */
866 for (int i = 0; i < natts; i++)
867 tupdesc->attrs[i].attstattarget = -1;
868 InsertPgAttributeTuples(rel, tupdesc, new_rel_oid, NULL, indstate);
869
870 /* add dependencies on their datatypes and collations */
871 for (int i = 0; i < natts; i++)
872 {
873 /* Add dependency info */
874 ObjectAddressSubSet(myself, RelationRelationId, new_rel_oid, i + 1);
875 ObjectAddressSet(referenced, TypeRelationId,
876 tupdesc->attrs[i].atttypid);
877 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
878
879 /* The default collation is pinned, so don't bother recording it */
880 if (OidIsValid(tupdesc->attrs[i].attcollation) &&
881 tupdesc->attrs[i].attcollation != DEFAULT_COLLATION_OID)
882 {
883 ObjectAddressSet(referenced, CollationRelationId,
884 tupdesc->attrs[i].attcollation);
885 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
886 }
887 }
888
889 /*
890 * Next we add the system attributes. Skip OID if rel has no OIDs. Skip
891 * all for a view or type relation. We don't bother with making datatype
892 * dependencies here, since presumably all these types are pinned.
893 */
894 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
895 {
896 TupleDesc td;
897
898 td = CreateTupleDesc(lengthof(SysAtt), (FormData_pg_attribute **) &SysAtt);
899
900 InsertPgAttributeTuples(rel, td, new_rel_oid, NULL, indstate);
901 FreeTupleDesc(td);
902 }
903
904 /*
905 * clean up
906 */
907 CatalogCloseIndexes(indstate);
908
909 table_close(rel, RowExclusiveLock);
910 }
911
912 /* --------------------------------
913 * InsertPgClassTuple
914 *
915 * Construct and insert a new tuple in pg_class.
916 *
917 * Caller has already opened and locked pg_class.
918 * Tuple data is taken from new_rel_desc->rd_rel, except for the
919 * variable-width fields which are not present in a cached reldesc.
920 * relacl and reloptions are passed in Datum form (to avoid having
921 * to reference the data types in heap.h). Pass (Datum) 0 to set them
922 * to NULL.
923 * --------------------------------
924 */
925 void
InsertPgClassTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Datum relacl,Datum reloptions)926 InsertPgClassTuple(Relation pg_class_desc,
927 Relation new_rel_desc,
928 Oid new_rel_oid,
929 Datum relacl,
930 Datum reloptions)
931 {
932 Form_pg_class rd_rel = new_rel_desc->rd_rel;
933 Datum values[Natts_pg_class];
934 bool nulls[Natts_pg_class];
935 HeapTuple tup;
936
937 /* This is a tad tedious, but way cleaner than what we used to do... */
938 memset(values, 0, sizeof(values));
939 memset(nulls, false, sizeof(nulls));
940
941 values[Anum_pg_class_oid - 1] = ObjectIdGetDatum(new_rel_oid);
942 values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
943 values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
944 values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
945 values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
946 values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
947 values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
948 values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
949 values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
950 values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
951 values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
952 values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
953 values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
954 values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
955 values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
956 values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
957 values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
958 values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
959 values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
960 values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
961 values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
962 values[Anum_pg_class_relrowsecurity - 1] = BoolGetDatum(rd_rel->relrowsecurity);
963 values[Anum_pg_class_relforcerowsecurity - 1] = BoolGetDatum(rd_rel->relforcerowsecurity);
964 values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
965 values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
966 values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
967 values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
968 values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
969 values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
970 values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
971 if (relacl != (Datum) 0)
972 values[Anum_pg_class_relacl - 1] = relacl;
973 else
974 nulls[Anum_pg_class_relacl - 1] = true;
975 if (reloptions != (Datum) 0)
976 values[Anum_pg_class_reloptions - 1] = reloptions;
977 else
978 nulls[Anum_pg_class_reloptions - 1] = true;
979
980 /* relpartbound is set by updating this tuple, if necessary */
981 nulls[Anum_pg_class_relpartbound - 1] = true;
982
983 tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
984
985 /* finally insert the new tuple, update the indexes, and clean up */
986 CatalogTupleInsert(pg_class_desc, tup);
987
988 heap_freetuple(tup);
989 }
990
991 /* --------------------------------
992 * AddNewRelationTuple
993 *
994 * this registers the new relation in the catalogs by
995 * adding a tuple to pg_class.
996 * --------------------------------
997 */
998 static void
AddNewRelationTuple(Relation pg_class_desc,Relation new_rel_desc,Oid new_rel_oid,Oid new_type_oid,Oid reloftype,Oid relowner,char relkind,TransactionId relfrozenxid,TransactionId relminmxid,Datum relacl,Datum reloptions)999 AddNewRelationTuple(Relation pg_class_desc,
1000 Relation new_rel_desc,
1001 Oid new_rel_oid,
1002 Oid new_type_oid,
1003 Oid reloftype,
1004 Oid relowner,
1005 char relkind,
1006 TransactionId relfrozenxid,
1007 TransactionId relminmxid,
1008 Datum relacl,
1009 Datum reloptions)
1010 {
1011 Form_pg_class new_rel_reltup;
1012
1013 /*
1014 * first we update some of the information in our uncataloged relation's
1015 * relation descriptor.
1016 */
1017 new_rel_reltup = new_rel_desc->rd_rel;
1018
1019 switch (relkind)
1020 {
1021 case RELKIND_RELATION:
1022 case RELKIND_MATVIEW:
1023 case RELKIND_INDEX:
1024 case RELKIND_TOASTVALUE:
1025 /* The relation is real, but as yet empty */
1026 new_rel_reltup->relpages = 0;
1027 new_rel_reltup->reltuples = -1;
1028 new_rel_reltup->relallvisible = 0;
1029 break;
1030 case RELKIND_SEQUENCE:
1031 /* Sequences always have a known size */
1032 new_rel_reltup->relpages = 1;
1033 new_rel_reltup->reltuples = 1;
1034 new_rel_reltup->relallvisible = 0;
1035 break;
1036 default:
1037 /* Views, etc, have no disk storage */
1038 new_rel_reltup->relpages = 0;
1039 new_rel_reltup->reltuples = -1;
1040 new_rel_reltup->relallvisible = 0;
1041 break;
1042 }
1043
1044 new_rel_reltup->relfrozenxid = relfrozenxid;
1045 new_rel_reltup->relminmxid = relminmxid;
1046 new_rel_reltup->relowner = relowner;
1047 new_rel_reltup->reltype = new_type_oid;
1048 new_rel_reltup->reloftype = reloftype;
1049
1050 /* relispartition is always set by updating this tuple later */
1051 new_rel_reltup->relispartition = false;
1052
1053 /* fill rd_att's type ID with something sane even if reltype is zero */
1054 new_rel_desc->rd_att->tdtypeid = new_type_oid ? new_type_oid : RECORDOID;
1055 new_rel_desc->rd_att->tdtypmod = -1;
1056
1057 /* Now build and insert the tuple */
1058 InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
1059 relacl, reloptions);
1060 }
1061
1062
1063 /* --------------------------------
1064 * AddNewRelationType -
1065 *
1066 * define a composite type corresponding to the new relation
1067 * --------------------------------
1068 */
1069 static ObjectAddress
AddNewRelationType(const char * typeName,Oid typeNamespace,Oid new_rel_oid,char new_rel_kind,Oid ownerid,Oid new_row_type,Oid new_array_type)1070 AddNewRelationType(const char *typeName,
1071 Oid typeNamespace,
1072 Oid new_rel_oid,
1073 char new_rel_kind,
1074 Oid ownerid,
1075 Oid new_row_type,
1076 Oid new_array_type)
1077 {
1078 return
1079 TypeCreate(new_row_type, /* optional predetermined OID */
1080 typeName, /* type name */
1081 typeNamespace, /* type namespace */
1082 new_rel_oid, /* relation oid */
1083 new_rel_kind, /* relation kind */
1084 ownerid, /* owner's ID */
1085 -1, /* internal size (varlena) */
1086 TYPTYPE_COMPOSITE, /* type-type (composite) */
1087 TYPCATEGORY_COMPOSITE, /* type-category (ditto) */
1088 false, /* composite types are never preferred */
1089 DEFAULT_TYPDELIM, /* default array delimiter */
1090 F_RECORD_IN, /* input procedure */
1091 F_RECORD_OUT, /* output procedure */
1092 F_RECORD_RECV, /* receive procedure */
1093 F_RECORD_SEND, /* send procedure */
1094 InvalidOid, /* typmodin procedure - none */
1095 InvalidOid, /* typmodout procedure - none */
1096 InvalidOid, /* analyze procedure - default */
1097 InvalidOid, /* subscript procedure - none */
1098 InvalidOid, /* array element type - irrelevant */
1099 false, /* this is not an array type */
1100 new_array_type, /* array type if any */
1101 InvalidOid, /* domain base type - irrelevant */
1102 NULL, /* default value - none */
1103 NULL, /* default binary representation */
1104 false, /* passed by reference */
1105 TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1106 TYPSTORAGE_EXTENDED, /* fully TOASTable */
1107 -1, /* typmod */
1108 0, /* array dimensions for typBaseType */
1109 false, /* Type NOT NULL */
1110 InvalidOid); /* rowtypes never have a collation */
1111 }
1112
1113 /* --------------------------------
1114 * heap_create_with_catalog
1115 *
1116 * creates a new cataloged relation. see comments above.
1117 *
1118 * Arguments:
1119 * relname: name to give to new rel
1120 * relnamespace: OID of namespace it goes in
1121 * reltablespace: OID of tablespace it goes in
1122 * relid: OID to assign to new rel, or InvalidOid to select a new OID
1123 * reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
1124 * reloftypeid: if a typed table, OID of underlying type; else InvalidOid
1125 * ownerid: OID of new rel's owner
1126 * accessmtd: OID of new rel's access method
1127 * tupdesc: tuple descriptor (source of column definitions)
1128 * cooked_constraints: list of precooked check constraints and defaults
1129 * relkind: relkind for new rel
1130 * relpersistence: rel's persistence status (permanent, temp, or unlogged)
1131 * shared_relation: true if it's to be a shared relation
1132 * mapped_relation: true if the relation will use the relfilenode map
1133 * oncommit: ON COMMIT marking (only relevant if it's a temp table)
1134 * reloptions: reloptions in Datum form, or (Datum) 0 if none
1135 * use_user_acl: true if should look for user-defined default permissions;
1136 * if false, relacl is always set NULL
1137 * allow_system_table_mods: true to allow creation in system namespaces
1138 * is_internal: is this a system-generated catalog?
1139 *
1140 * Output parameters:
1141 * typaddress: if not null, gets the object address of the new pg_type entry
1142 * (this must be null if the relkind is one that doesn't get a pg_type entry)
1143 *
1144 * Returns the OID of the new relation
1145 * --------------------------------
1146 */
1147 Oid
heap_create_with_catalog(const char * relname,Oid relnamespace,Oid reltablespace,Oid relid,Oid reltypeid,Oid reloftypeid,Oid ownerid,Oid accessmtd,TupleDesc tupdesc,List * cooked_constraints,char relkind,char relpersistence,bool shared_relation,bool mapped_relation,OnCommitAction oncommit,Datum reloptions,bool use_user_acl,bool allow_system_table_mods,bool is_internal,Oid relrewrite,ObjectAddress * typaddress)1148 heap_create_with_catalog(const char *relname,
1149 Oid relnamespace,
1150 Oid reltablespace,
1151 Oid relid,
1152 Oid reltypeid,
1153 Oid reloftypeid,
1154 Oid ownerid,
1155 Oid accessmtd,
1156 TupleDesc tupdesc,
1157 List *cooked_constraints,
1158 char relkind,
1159 char relpersistence,
1160 bool shared_relation,
1161 bool mapped_relation,
1162 OnCommitAction oncommit,
1163 Datum reloptions,
1164 bool use_user_acl,
1165 bool allow_system_table_mods,
1166 bool is_internal,
1167 Oid relrewrite,
1168 ObjectAddress *typaddress)
1169 {
1170 Relation pg_class_desc;
1171 Relation new_rel_desc;
1172 Acl *relacl;
1173 Oid existing_relid;
1174 Oid old_type_oid;
1175 Oid new_type_oid;
1176 TransactionId relfrozenxid;
1177 MultiXactId relminmxid;
1178
1179 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1180
1181 /*
1182 * sanity checks
1183 */
1184 Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1185
1186 /*
1187 * Validate proposed tupdesc for the desired relkind. If
1188 * allow_system_table_mods is on, allow ANYARRAY to be used; this is a
1189 * hack to allow creating pg_statistic and cloning it during VACUUM FULL.
1190 */
1191 CheckAttributeNamesTypes(tupdesc, relkind,
1192 allow_system_table_mods ? CHKATYPE_ANYARRAY : 0);
1193
1194 /*
1195 * This would fail later on anyway, if the relation already exists. But
1196 * by catching it here we can emit a nicer error message.
1197 */
1198 existing_relid = get_relname_relid(relname, relnamespace);
1199 if (existing_relid != InvalidOid)
1200 ereport(ERROR,
1201 (errcode(ERRCODE_DUPLICATE_TABLE),
1202 errmsg("relation \"%s\" already exists", relname)));
1203
1204 /*
1205 * Since we are going to create a rowtype as well, also check for
1206 * collision with an existing type name. If there is one and it's an
1207 * autogenerated array, we can rename it out of the way; otherwise we can
1208 * at least give a good error message.
1209 */
1210 old_type_oid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
1211 CStringGetDatum(relname),
1212 ObjectIdGetDatum(relnamespace));
1213 if (OidIsValid(old_type_oid))
1214 {
1215 if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1216 ereport(ERROR,
1217 (errcode(ERRCODE_DUPLICATE_OBJECT),
1218 errmsg("type \"%s\" already exists", relname),
1219 errhint("A relation has an associated type of the same name, "
1220 "so you must use a name that doesn't conflict "
1221 "with any existing type.")));
1222 }
1223
1224 /*
1225 * Shared relations must be in pg_global (last-ditch check)
1226 */
1227 if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1228 elog(ERROR, "shared relations must be placed in pg_global tablespace");
1229
1230 /*
1231 * Allocate an OID for the relation, unless we were told what to use.
1232 *
1233 * The OID will be the relfilenode as well, so make sure it doesn't
1234 * collide with either pg_class OIDs or existing physical files.
1235 */
1236 if (!OidIsValid(relid))
1237 {
1238 /* Use binary-upgrade override for pg_class.oid/relfilenode? */
1239 if (IsBinaryUpgrade &&
1240 (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1241 relkind == RELKIND_VIEW || relkind == RELKIND_MATVIEW ||
1242 relkind == RELKIND_COMPOSITE_TYPE || relkind == RELKIND_FOREIGN_TABLE ||
1243 relkind == RELKIND_PARTITIONED_TABLE))
1244 {
1245 if (!OidIsValid(binary_upgrade_next_heap_pg_class_oid))
1246 ereport(ERROR,
1247 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1248 errmsg("pg_class heap OID value not set when in binary upgrade mode")));
1249
1250 relid = binary_upgrade_next_heap_pg_class_oid;
1251 binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1252 }
1253 /* There might be no TOAST table, so we have to test for it. */
1254 else if (IsBinaryUpgrade &&
1255 OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1256 relkind == RELKIND_TOASTVALUE)
1257 {
1258 relid = binary_upgrade_next_toast_pg_class_oid;
1259 binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1260 }
1261 else
1262 relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1263 relpersistence);
1264 }
1265
1266 /*
1267 * Determine the relation's initial permissions.
1268 */
1269 if (use_user_acl)
1270 {
1271 switch (relkind)
1272 {
1273 case RELKIND_RELATION:
1274 case RELKIND_VIEW:
1275 case RELKIND_MATVIEW:
1276 case RELKIND_FOREIGN_TABLE:
1277 case RELKIND_PARTITIONED_TABLE:
1278 relacl = get_user_default_acl(OBJECT_TABLE, ownerid,
1279 relnamespace);
1280 break;
1281 case RELKIND_SEQUENCE:
1282 relacl = get_user_default_acl(OBJECT_SEQUENCE, ownerid,
1283 relnamespace);
1284 break;
1285 default:
1286 relacl = NULL;
1287 break;
1288 }
1289 }
1290 else
1291 relacl = NULL;
1292
1293 /*
1294 * Create the relcache entry (mostly dummy at this point) and the physical
1295 * disk file. (If we fail further down, it's the smgr's responsibility to
1296 * remove the disk file again.)
1297 */
1298 new_rel_desc = heap_create(relname,
1299 relnamespace,
1300 reltablespace,
1301 relid,
1302 InvalidOid,
1303 accessmtd,
1304 tupdesc,
1305 relkind,
1306 relpersistence,
1307 shared_relation,
1308 mapped_relation,
1309 allow_system_table_mods,
1310 &relfrozenxid,
1311 &relminmxid);
1312
1313 Assert(relid == RelationGetRelid(new_rel_desc));
1314
1315 new_rel_desc->rd_rel->relrewrite = relrewrite;
1316
1317 /*
1318 * Decide whether to create a pg_type entry for the relation's rowtype.
1319 * These types are made except where the use of a relation as such is an
1320 * implementation detail: toast tables, sequences and indexes.
1321 */
1322 if (!(relkind == RELKIND_SEQUENCE ||
1323 relkind == RELKIND_TOASTVALUE ||
1324 relkind == RELKIND_INDEX ||
1325 relkind == RELKIND_PARTITIONED_INDEX))
1326 {
1327 Oid new_array_oid;
1328 ObjectAddress new_type_addr;
1329 char *relarrayname;
1330
1331 /*
1332 * We'll make an array over the composite type, too. For largely
1333 * historical reasons, the array type's OID is assigned first.
1334 */
1335 new_array_oid = AssignTypeArrayOid();
1336
1337 /*
1338 * Make the pg_type entry for the composite type. The OID of the
1339 * composite type can be preselected by the caller, but if reltypeid
1340 * is InvalidOid, we'll generate a new OID for it.
1341 *
1342 * NOTE: we could get a unique-index failure here, in case someone
1343 * else is creating the same type name in parallel but hadn't
1344 * committed yet when we checked for a duplicate name above.
1345 */
1346 new_type_addr = AddNewRelationType(relname,
1347 relnamespace,
1348 relid,
1349 relkind,
1350 ownerid,
1351 reltypeid,
1352 new_array_oid);
1353 new_type_oid = new_type_addr.objectId;
1354 if (typaddress)
1355 *typaddress = new_type_addr;
1356
1357 /* Now create the array type. */
1358 relarrayname = makeArrayTypeName(relname, relnamespace);
1359
1360 TypeCreate(new_array_oid, /* force the type's OID to this */
1361 relarrayname, /* Array type name */
1362 relnamespace, /* Same namespace as parent */
1363 InvalidOid, /* Not composite, no relationOid */
1364 0, /* relkind, also N/A here */
1365 ownerid, /* owner's ID */
1366 -1, /* Internal size (varlena) */
1367 TYPTYPE_BASE, /* Not composite - typelem is */
1368 TYPCATEGORY_ARRAY, /* type-category (array) */
1369 false, /* array types are never preferred */
1370 DEFAULT_TYPDELIM, /* default array delimiter */
1371 F_ARRAY_IN, /* array input proc */
1372 F_ARRAY_OUT, /* array output proc */
1373 F_ARRAY_RECV, /* array recv (bin) proc */
1374 F_ARRAY_SEND, /* array send (bin) proc */
1375 InvalidOid, /* typmodin procedure - none */
1376 InvalidOid, /* typmodout procedure - none */
1377 F_ARRAY_TYPANALYZE, /* array analyze procedure */
1378 F_ARRAY_SUBSCRIPT_HANDLER, /* array subscript procedure */
1379 new_type_oid, /* array element type - the rowtype */
1380 true, /* yes, this is an array type */
1381 InvalidOid, /* this has no array type */
1382 InvalidOid, /* domain base type - irrelevant */
1383 NULL, /* default value - none */
1384 NULL, /* default binary representation */
1385 false, /* passed by reference */
1386 TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1387 TYPSTORAGE_EXTENDED, /* fully TOASTable */
1388 -1, /* typmod */
1389 0, /* array dimensions for typBaseType */
1390 false, /* Type NOT NULL */
1391 InvalidOid); /* rowtypes never have a collation */
1392
1393 pfree(relarrayname);
1394 }
1395 else
1396 {
1397 /* Caller should not be expecting a type to be created. */
1398 Assert(reltypeid == InvalidOid);
1399 Assert(typaddress == NULL);
1400
1401 new_type_oid = InvalidOid;
1402 }
1403
1404 /*
1405 * now create an entry in pg_class for the relation.
1406 *
1407 * NOTE: we could get a unique-index failure here, in case someone else is
1408 * creating the same relation name in parallel but hadn't committed yet
1409 * when we checked for a duplicate name above.
1410 */
1411 AddNewRelationTuple(pg_class_desc,
1412 new_rel_desc,
1413 relid,
1414 new_type_oid,
1415 reloftypeid,
1416 ownerid,
1417 relkind,
1418 relfrozenxid,
1419 relminmxid,
1420 PointerGetDatum(relacl),
1421 reloptions);
1422
1423 /*
1424 * now add tuples to pg_attribute for the attributes in our new relation.
1425 */
1426 AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind);
1427
1428 /*
1429 * Make a dependency link to force the relation to be deleted if its
1430 * namespace is. Also make a dependency link to its owner, as well as
1431 * dependencies for any roles mentioned in the default ACL.
1432 *
1433 * For composite types, these dependencies are tracked for the pg_type
1434 * entry, so we needn't record them here. Likewise, TOAST tables don't
1435 * need a namespace dependency (they live in a pinned namespace) nor an
1436 * owner dependency (they depend indirectly through the parent table), nor
1437 * should they have any ACL entries. The same applies for extension
1438 * dependencies.
1439 *
1440 * Also, skip this in bootstrap mode, since we don't make dependencies
1441 * while bootstrapping.
1442 */
1443 if (relkind != RELKIND_COMPOSITE_TYPE &&
1444 relkind != RELKIND_TOASTVALUE &&
1445 !IsBootstrapProcessingMode())
1446 {
1447 ObjectAddress myself,
1448 referenced;
1449 ObjectAddresses *addrs;
1450
1451 ObjectAddressSet(myself, RelationRelationId, relid);
1452
1453 recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1454
1455 recordDependencyOnNewAcl(RelationRelationId, relid, 0, ownerid, relacl);
1456
1457 recordDependencyOnCurrentExtension(&myself, false);
1458
1459 addrs = new_object_addresses();
1460
1461 ObjectAddressSet(referenced, NamespaceRelationId, relnamespace);
1462 add_exact_object_address(&referenced, addrs);
1463
1464 if (reloftypeid)
1465 {
1466 ObjectAddressSet(referenced, TypeRelationId, reloftypeid);
1467 add_exact_object_address(&referenced, addrs);
1468 }
1469
1470 /*
1471 * Make a dependency link to force the relation to be deleted if its
1472 * access method is. Do this only for relation and materialized views.
1473 *
1474 * No need to add an explicit dependency for the toast table, as the
1475 * main table depends on it.
1476 */
1477 if (relkind == RELKIND_RELATION ||
1478 relkind == RELKIND_MATVIEW)
1479 {
1480 ObjectAddressSet(referenced, AccessMethodRelationId, accessmtd);
1481 add_exact_object_address(&referenced, addrs);
1482 }
1483
1484 record_object_address_dependencies(&myself, addrs, DEPENDENCY_NORMAL);
1485 free_object_addresses(addrs);
1486 }
1487
1488 /* Post creation hook for new relation */
1489 InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
1490
1491 /*
1492 * Store any supplied constraints and defaults.
1493 *
1494 * NB: this may do a CommandCounterIncrement and rebuild the relcache
1495 * entry, so the relation must be valid and self-consistent at this point.
1496 * In particular, there are not yet constraints and defaults anywhere.
1497 */
1498 StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
1499
1500 /*
1501 * If there's a special on-commit action, remember it
1502 */
1503 if (oncommit != ONCOMMIT_NOOP)
1504 register_on_commit_action(relid, oncommit);
1505
1506 /*
1507 * ok, the relation has been cataloged, so close our relations and return
1508 * the OID of the newly created relation.
1509 */
1510 table_close(new_rel_desc, NoLock); /* do not unlock till end of xact */
1511 table_close(pg_class_desc, RowExclusiveLock);
1512
1513 return relid;
1514 }
1515
1516 /*
1517 * RelationRemoveInheritance
1518 *
1519 * Formerly, this routine checked for child relations and aborted the
1520 * deletion if any were found. Now we rely on the dependency mechanism
1521 * to check for or delete child relations. By the time we get here,
1522 * there are no children and we need only remove any pg_inherits rows
1523 * linking this relation to its parent(s).
1524 */
1525 static void
RelationRemoveInheritance(Oid relid)1526 RelationRemoveInheritance(Oid relid)
1527 {
1528 Relation catalogRelation;
1529 SysScanDesc scan;
1530 ScanKeyData key;
1531 HeapTuple tuple;
1532
1533 catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
1534
1535 ScanKeyInit(&key,
1536 Anum_pg_inherits_inhrelid,
1537 BTEqualStrategyNumber, F_OIDEQ,
1538 ObjectIdGetDatum(relid));
1539
1540 scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1541 NULL, 1, &key);
1542
1543 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1544 CatalogTupleDelete(catalogRelation, &tuple->t_self);
1545
1546 systable_endscan(scan);
1547 table_close(catalogRelation, RowExclusiveLock);
1548 }
1549
1550 /*
1551 * DeleteRelationTuple
1552 *
1553 * Remove pg_class row for the given relid.
1554 *
1555 * Note: this is shared by relation deletion and index deletion. It's
1556 * not intended for use anyplace else.
1557 */
1558 void
DeleteRelationTuple(Oid relid)1559 DeleteRelationTuple(Oid relid)
1560 {
1561 Relation pg_class_desc;
1562 HeapTuple tup;
1563
1564 /* Grab an appropriate lock on the pg_class relation */
1565 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1566
1567 tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1568 if (!HeapTupleIsValid(tup))
1569 elog(ERROR, "cache lookup failed for relation %u", relid);
1570
1571 /* delete the relation tuple from pg_class, and finish up */
1572 CatalogTupleDelete(pg_class_desc, &tup->t_self);
1573
1574 ReleaseSysCache(tup);
1575
1576 table_close(pg_class_desc, RowExclusiveLock);
1577 }
1578
1579 /*
1580 * DeleteAttributeTuples
1581 *
1582 * Remove pg_attribute rows for the given relid.
1583 *
1584 * Note: this is shared by relation deletion and index deletion. It's
1585 * not intended for use anyplace else.
1586 */
1587 void
DeleteAttributeTuples(Oid relid)1588 DeleteAttributeTuples(Oid relid)
1589 {
1590 Relation attrel;
1591 SysScanDesc scan;
1592 ScanKeyData key[1];
1593 HeapTuple atttup;
1594
1595 /* Grab an appropriate lock on the pg_attribute relation */
1596 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1597
1598 /* Use the index to scan only attributes of the target relation */
1599 ScanKeyInit(&key[0],
1600 Anum_pg_attribute_attrelid,
1601 BTEqualStrategyNumber, F_OIDEQ,
1602 ObjectIdGetDatum(relid));
1603
1604 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1605 NULL, 1, key);
1606
1607 /* Delete all the matching tuples */
1608 while ((atttup = systable_getnext(scan)) != NULL)
1609 CatalogTupleDelete(attrel, &atttup->t_self);
1610
1611 /* Clean up after the scan */
1612 systable_endscan(scan);
1613 table_close(attrel, RowExclusiveLock);
1614 }
1615
1616 /*
1617 * DeleteSystemAttributeTuples
1618 *
1619 * Remove pg_attribute rows for system columns of the given relid.
1620 *
1621 * Note: this is only used when converting a table to a view. Views don't
1622 * have system columns, so we should remove them from pg_attribute.
1623 */
1624 void
DeleteSystemAttributeTuples(Oid relid)1625 DeleteSystemAttributeTuples(Oid relid)
1626 {
1627 Relation attrel;
1628 SysScanDesc scan;
1629 ScanKeyData key[2];
1630 HeapTuple atttup;
1631
1632 /* Grab an appropriate lock on the pg_attribute relation */
1633 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1634
1635 /* Use the index to scan only system attributes of the target relation */
1636 ScanKeyInit(&key[0],
1637 Anum_pg_attribute_attrelid,
1638 BTEqualStrategyNumber, F_OIDEQ,
1639 ObjectIdGetDatum(relid));
1640 ScanKeyInit(&key[1],
1641 Anum_pg_attribute_attnum,
1642 BTLessEqualStrategyNumber, F_INT2LE,
1643 Int16GetDatum(0));
1644
1645 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1646 NULL, 2, key);
1647
1648 /* Delete all the matching tuples */
1649 while ((atttup = systable_getnext(scan)) != NULL)
1650 CatalogTupleDelete(attrel, &atttup->t_self);
1651
1652 /* Clean up after the scan */
1653 systable_endscan(scan);
1654 table_close(attrel, RowExclusiveLock);
1655 }
1656
1657 /*
1658 * RemoveAttributeById
1659 *
1660 * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1661 * deleted in pg_attribute. We also remove pg_statistic entries for it.
1662 * (Everything else needed, such as getting rid of any pg_attrdef entry,
1663 * is handled by dependency.c.)
1664 */
1665 void
RemoveAttributeById(Oid relid,AttrNumber attnum)1666 RemoveAttributeById(Oid relid, AttrNumber attnum)
1667 {
1668 Relation rel;
1669 Relation attr_rel;
1670 HeapTuple tuple;
1671 Form_pg_attribute attStruct;
1672 char newattname[NAMEDATALEN];
1673
1674 /*
1675 * Grab an exclusive lock on the target table, which we will NOT release
1676 * until end of transaction. (In the simple case where we are directly
1677 * dropping this column, ATExecDropColumn already did this ... but when
1678 * cascading from a drop of some other object, we may not have any lock.)
1679 */
1680 rel = relation_open(relid, AccessExclusiveLock);
1681
1682 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1683
1684 tuple = SearchSysCacheCopy2(ATTNUM,
1685 ObjectIdGetDatum(relid),
1686 Int16GetDatum(attnum));
1687 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1688 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1689 attnum, relid);
1690 attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1691
1692 if (attnum < 0)
1693 {
1694 /* System attribute (probably OID) ... just delete the row */
1695
1696 CatalogTupleDelete(attr_rel, &tuple->t_self);
1697 }
1698 else
1699 {
1700 /* Dropping user attributes is lots harder */
1701
1702 /* Mark the attribute as dropped */
1703 attStruct->attisdropped = true;
1704
1705 /*
1706 * Set the type OID to invalid. A dropped attribute's type link
1707 * cannot be relied on (once the attribute is dropped, the type might
1708 * be too). Fortunately we do not need the type row --- the only
1709 * really essential information is the type's typlen and typalign,
1710 * which are preserved in the attribute's attlen and attalign. We set
1711 * atttypid to zero here as a means of catching code that incorrectly
1712 * expects it to be valid.
1713 */
1714 attStruct->atttypid = InvalidOid;
1715
1716 /* Remove any NOT NULL constraint the column may have */
1717 attStruct->attnotnull = false;
1718
1719 /* We don't want to keep stats for it anymore */
1720 attStruct->attstattarget = 0;
1721
1722 /* Unset this so no one tries to look up the generation expression */
1723 attStruct->attgenerated = '\0';
1724
1725 /*
1726 * Change the column name to something that isn't likely to conflict
1727 */
1728 snprintf(newattname, sizeof(newattname),
1729 "........pg.dropped.%d........", attnum);
1730 namestrcpy(&(attStruct->attname), newattname);
1731
1732 /* clear the missing value if any */
1733 if (attStruct->atthasmissing)
1734 {
1735 Datum valuesAtt[Natts_pg_attribute];
1736 bool nullsAtt[Natts_pg_attribute];
1737 bool replacesAtt[Natts_pg_attribute];
1738
1739 /* update the tuple - set atthasmissing and attmissingval */
1740 MemSet(valuesAtt, 0, sizeof(valuesAtt));
1741 MemSet(nullsAtt, false, sizeof(nullsAtt));
1742 MemSet(replacesAtt, false, sizeof(replacesAtt));
1743
1744 valuesAtt[Anum_pg_attribute_atthasmissing - 1] =
1745 BoolGetDatum(false);
1746 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
1747 valuesAtt[Anum_pg_attribute_attmissingval - 1] = (Datum) 0;
1748 nullsAtt[Anum_pg_attribute_attmissingval - 1] = true;
1749 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
1750
1751 tuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
1752 valuesAtt, nullsAtt, replacesAtt);
1753 }
1754
1755 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1756 }
1757
1758 /*
1759 * Because updating the pg_attribute row will trigger a relcache flush for
1760 * the target relation, we need not do anything else to notify other
1761 * backends of the change.
1762 */
1763
1764 table_close(attr_rel, RowExclusiveLock);
1765
1766 if (attnum > 0)
1767 RemoveStatistics(relid, attnum);
1768
1769 relation_close(rel, NoLock);
1770 }
1771
1772 /*
1773 * RemoveAttrDefault
1774 *
1775 * If the specified relation/attribute has a default, remove it.
1776 * (If no default, raise error if complain is true, else return quietly.)
1777 */
1778 void
RemoveAttrDefault(Oid relid,AttrNumber attnum,DropBehavior behavior,bool complain,bool internal)1779 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1780 DropBehavior behavior, bool complain, bool internal)
1781 {
1782 Relation attrdef_rel;
1783 ScanKeyData scankeys[2];
1784 SysScanDesc scan;
1785 HeapTuple tuple;
1786 bool found = false;
1787
1788 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1789
1790 ScanKeyInit(&scankeys[0],
1791 Anum_pg_attrdef_adrelid,
1792 BTEqualStrategyNumber, F_OIDEQ,
1793 ObjectIdGetDatum(relid));
1794 ScanKeyInit(&scankeys[1],
1795 Anum_pg_attrdef_adnum,
1796 BTEqualStrategyNumber, F_INT2EQ,
1797 Int16GetDatum(attnum));
1798
1799 scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1800 NULL, 2, scankeys);
1801
1802 /* There should be at most one matching tuple, but we loop anyway */
1803 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1804 {
1805 ObjectAddress object;
1806 Form_pg_attrdef attrtuple = (Form_pg_attrdef) GETSTRUCT(tuple);
1807
1808 object.classId = AttrDefaultRelationId;
1809 object.objectId = attrtuple->oid;
1810 object.objectSubId = 0;
1811
1812 performDeletion(&object, behavior,
1813 internal ? PERFORM_DELETION_INTERNAL : 0);
1814
1815 found = true;
1816 }
1817
1818 systable_endscan(scan);
1819 table_close(attrdef_rel, RowExclusiveLock);
1820
1821 if (complain && !found)
1822 elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1823 relid, attnum);
1824 }
1825
1826 /*
1827 * RemoveAttrDefaultById
1828 *
1829 * Remove a pg_attrdef entry specified by OID. This is the guts of
1830 * attribute-default removal. Note it should be called via performDeletion,
1831 * not directly.
1832 */
1833 void
RemoveAttrDefaultById(Oid attrdefId)1834 RemoveAttrDefaultById(Oid attrdefId)
1835 {
1836 Relation attrdef_rel;
1837 Relation attr_rel;
1838 Relation myrel;
1839 ScanKeyData scankeys[1];
1840 SysScanDesc scan;
1841 HeapTuple tuple;
1842 Oid myrelid;
1843 AttrNumber myattnum;
1844
1845 /* Grab an appropriate lock on the pg_attrdef relation */
1846 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1847
1848 /* Find the pg_attrdef tuple */
1849 ScanKeyInit(&scankeys[0],
1850 Anum_pg_attrdef_oid,
1851 BTEqualStrategyNumber, F_OIDEQ,
1852 ObjectIdGetDatum(attrdefId));
1853
1854 scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1855 NULL, 1, scankeys);
1856
1857 tuple = systable_getnext(scan);
1858 if (!HeapTupleIsValid(tuple))
1859 elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1860
1861 myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1862 myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1863
1864 /* Get an exclusive lock on the relation owning the attribute */
1865 myrel = relation_open(myrelid, AccessExclusiveLock);
1866
1867 /* Now we can delete the pg_attrdef row */
1868 CatalogTupleDelete(attrdef_rel, &tuple->t_self);
1869
1870 systable_endscan(scan);
1871 table_close(attrdef_rel, RowExclusiveLock);
1872
1873 /* Fix the pg_attribute row */
1874 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1875
1876 tuple = SearchSysCacheCopy2(ATTNUM,
1877 ObjectIdGetDatum(myrelid),
1878 Int16GetDatum(myattnum));
1879 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1880 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1881 myattnum, myrelid);
1882
1883 ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1884
1885 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1886
1887 /*
1888 * Our update of the pg_attribute row will force a relcache rebuild, so
1889 * there's nothing else to do here.
1890 */
1891 table_close(attr_rel, RowExclusiveLock);
1892
1893 /* Keep lock on attribute's rel until end of xact */
1894 relation_close(myrel, NoLock);
1895 }
1896
1897 /*
1898 * heap_drop_with_catalog - removes specified relation from catalogs
1899 *
1900 * Note that this routine is not responsible for dropping objects that are
1901 * linked to the pg_class entry via dependencies (for example, indexes and
1902 * constraints). Those are deleted by the dependency-tracing logic in
1903 * dependency.c before control gets here. In general, therefore, this routine
1904 * should never be called directly; go through performDeletion() instead.
1905 */
1906 void
heap_drop_with_catalog(Oid relid)1907 heap_drop_with_catalog(Oid relid)
1908 {
1909 Relation rel;
1910 HeapTuple tuple;
1911 Oid parentOid = InvalidOid,
1912 defaultPartOid = InvalidOid;
1913
1914 /*
1915 * To drop a partition safely, we must grab exclusive lock on its parent,
1916 * because another backend might be about to execute a query on the parent
1917 * table. If it relies on previously cached partition descriptor, then it
1918 * could attempt to access the just-dropped relation as its partition. We
1919 * must therefore take a table lock strong enough to prevent all queries
1920 * on the table from proceeding until we commit and send out a
1921 * shared-cache-inval notice that will make them update their partition
1922 * descriptors.
1923 */
1924 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1925 if (!HeapTupleIsValid(tuple))
1926 elog(ERROR, "cache lookup failed for relation %u", relid);
1927 if (((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1928 {
1929 /*
1930 * We have to lock the parent if the partition is being detached,
1931 * because it's possible that some query still has a partition
1932 * descriptor that includes this partition.
1933 */
1934 parentOid = get_partition_parent(relid, true);
1935 LockRelationOid(parentOid, AccessExclusiveLock);
1936
1937 /*
1938 * If this is not the default partition, dropping it will change the
1939 * default partition's partition constraint, so we must lock it.
1940 */
1941 defaultPartOid = get_default_partition_oid(parentOid);
1942 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
1943 LockRelationOid(defaultPartOid, AccessExclusiveLock);
1944 }
1945
1946 ReleaseSysCache(tuple);
1947
1948 /*
1949 * Open and lock the relation.
1950 */
1951 rel = relation_open(relid, AccessExclusiveLock);
1952
1953 /*
1954 * There can no longer be anyone *else* touching the relation, but we
1955 * might still have open queries or cursors, or pending trigger events, in
1956 * our own session.
1957 */
1958 CheckTableNotInUse(rel, "DROP TABLE");
1959
1960 /*
1961 * This effectively deletes all rows in the table, and may be done in a
1962 * serializable transaction. In that case we must record a rw-conflict in
1963 * to this transaction from each transaction holding a predicate lock on
1964 * the table.
1965 */
1966 CheckTableForSerializableConflictIn(rel);
1967
1968 /*
1969 * Delete pg_foreign_table tuple first.
1970 */
1971 if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1972 {
1973 Relation rel;
1974 HeapTuple tuple;
1975
1976 rel = table_open(ForeignTableRelationId, RowExclusiveLock);
1977
1978 tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1979 if (!HeapTupleIsValid(tuple))
1980 elog(ERROR, "cache lookup failed for foreign table %u", relid);
1981
1982 CatalogTupleDelete(rel, &tuple->t_self);
1983
1984 ReleaseSysCache(tuple);
1985 table_close(rel, RowExclusiveLock);
1986 }
1987
1988 /*
1989 * If a partitioned table, delete the pg_partitioned_table tuple.
1990 */
1991 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1992 RemovePartitionKeyByRelId(relid);
1993
1994 /*
1995 * If the relation being dropped is the default partition itself,
1996 * invalidate its entry in pg_partitioned_table.
1997 */
1998 if (relid == defaultPartOid)
1999 update_default_partition_oid(parentOid, InvalidOid);
2000
2001 /*
2002 * Schedule unlinking of the relation's physical files at commit.
2003 */
2004 if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
2005 RelationDropStorage(rel);
2006
2007 /*
2008 * Close relcache entry, but *keep* AccessExclusiveLock on the relation
2009 * until transaction commit. This ensures no one else will try to do
2010 * something with the doomed relation.
2011 */
2012 relation_close(rel, NoLock);
2013
2014 /*
2015 * Remove any associated relation synchronization states.
2016 */
2017 RemoveSubscriptionRel(InvalidOid, relid);
2018
2019 /*
2020 * Forget any ON COMMIT action for the rel
2021 */
2022 remove_on_commit_action(relid);
2023
2024 /*
2025 * Flush the relation from the relcache. We want to do this before
2026 * starting to remove catalog entries, just to be certain that no relcache
2027 * entry rebuild will happen partway through. (That should not really
2028 * matter, since we don't do CommandCounterIncrement here, but let's be
2029 * safe.)
2030 */
2031 RelationForgetRelation(relid);
2032
2033 /*
2034 * remove inheritance information
2035 */
2036 RelationRemoveInheritance(relid);
2037
2038 /*
2039 * delete statistics
2040 */
2041 RemoveStatistics(relid, 0);
2042
2043 /*
2044 * delete attribute tuples
2045 */
2046 DeleteAttributeTuples(relid);
2047
2048 /*
2049 * delete relation tuple
2050 */
2051 DeleteRelationTuple(relid);
2052
2053 if (OidIsValid(parentOid))
2054 {
2055 /*
2056 * If this is not the default partition, the partition constraint of
2057 * the default partition has changed to include the portion of the key
2058 * space previously covered by the dropped partition.
2059 */
2060 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
2061 CacheInvalidateRelcacheByRelid(defaultPartOid);
2062
2063 /*
2064 * Invalidate the parent's relcache so that the partition is no longer
2065 * included in its partition descriptor.
2066 */
2067 CacheInvalidateRelcacheByRelid(parentOid);
2068 /* keep the lock */
2069 }
2070 }
2071
2072
2073 /*
2074 * RelationClearMissing
2075 *
2076 * Set atthasmissing and attmissingval to false/null for all attributes
2077 * where they are currently set. This can be safely and usefully done if
2078 * the table is rewritten (e.g. by VACUUM FULL or CLUSTER) where we know there
2079 * are no rows left with less than a full complement of attributes.
2080 *
2081 * The caller must have an AccessExclusive lock on the relation.
2082 */
2083 void
RelationClearMissing(Relation rel)2084 RelationClearMissing(Relation rel)
2085 {
2086 Relation attr_rel;
2087 Oid relid = RelationGetRelid(rel);
2088 int natts = RelationGetNumberOfAttributes(rel);
2089 int attnum;
2090 Datum repl_val[Natts_pg_attribute];
2091 bool repl_null[Natts_pg_attribute];
2092 bool repl_repl[Natts_pg_attribute];
2093 Form_pg_attribute attrtuple;
2094 HeapTuple tuple,
2095 newtuple;
2096
2097 memset(repl_val, 0, sizeof(repl_val));
2098 memset(repl_null, false, sizeof(repl_null));
2099 memset(repl_repl, false, sizeof(repl_repl));
2100
2101 repl_val[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(false);
2102 repl_null[Anum_pg_attribute_attmissingval - 1] = true;
2103
2104 repl_repl[Anum_pg_attribute_atthasmissing - 1] = true;
2105 repl_repl[Anum_pg_attribute_attmissingval - 1] = true;
2106
2107
2108 /* Get a lock on pg_attribute */
2109 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
2110
2111 /* process each non-system attribute, including any dropped columns */
2112 for (attnum = 1; attnum <= natts; attnum++)
2113 {
2114 tuple = SearchSysCache2(ATTNUM,
2115 ObjectIdGetDatum(relid),
2116 Int16GetDatum(attnum));
2117 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
2118 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2119 attnum, relid);
2120
2121 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
2122
2123 /* ignore any where atthasmissing is not true */
2124 if (attrtuple->atthasmissing)
2125 {
2126 newtuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
2127 repl_val, repl_null, repl_repl);
2128
2129 CatalogTupleUpdate(attr_rel, &newtuple->t_self, newtuple);
2130
2131 heap_freetuple(newtuple);
2132 }
2133
2134 ReleaseSysCache(tuple);
2135 }
2136
2137 /*
2138 * Our update of the pg_attribute rows will force a relcache rebuild, so
2139 * there's nothing else to do here.
2140 */
2141 table_close(attr_rel, RowExclusiveLock);
2142 }
2143
2144 /*
2145 * SetAttrMissing
2146 *
2147 * Set the missing value of a single attribute. This should only be used by
2148 * binary upgrade. Takes an AccessExclusive lock on the relation owning the
2149 * attribute.
2150 */
2151 void
SetAttrMissing(Oid relid,char * attname,char * value)2152 SetAttrMissing(Oid relid, char *attname, char *value)
2153 {
2154 Datum valuesAtt[Natts_pg_attribute];
2155 bool nullsAtt[Natts_pg_attribute];
2156 bool replacesAtt[Natts_pg_attribute];
2157 Datum missingval;
2158 Form_pg_attribute attStruct;
2159 Relation attrrel,
2160 tablerel;
2161 HeapTuple atttup,
2162 newtup;
2163
2164 /* lock the table the attribute belongs to */
2165 tablerel = table_open(relid, AccessExclusiveLock);
2166
2167 /* Don't do anything unless it's a plain table */
2168 if (tablerel->rd_rel->relkind != RELKIND_RELATION)
2169 {
2170 table_close(tablerel, AccessExclusiveLock);
2171 return;
2172 }
2173
2174 /* Lock the attribute row and get the data */
2175 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2176 atttup = SearchSysCacheAttName(relid, attname);
2177 if (!HeapTupleIsValid(atttup))
2178 elog(ERROR, "cache lookup failed for attribute %s of relation %u",
2179 attname, relid);
2180 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2181
2182 /* get an array value from the value string */
2183 missingval = OidFunctionCall3(F_ARRAY_IN,
2184 CStringGetDatum(value),
2185 ObjectIdGetDatum(attStruct->atttypid),
2186 Int32GetDatum(attStruct->atttypmod));
2187
2188 /* update the tuple - set atthasmissing and attmissingval */
2189 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2190 MemSet(nullsAtt, false, sizeof(nullsAtt));
2191 MemSet(replacesAtt, false, sizeof(replacesAtt));
2192
2193 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
2194 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2195 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2196 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2197
2198 newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2199 valuesAtt, nullsAtt, replacesAtt);
2200 CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
2201
2202 /* clean up */
2203 ReleaseSysCache(atttup);
2204 table_close(attrrel, RowExclusiveLock);
2205 table_close(tablerel, AccessExclusiveLock);
2206 }
2207
2208 /*
2209 * Store a default expression for column attnum of relation rel.
2210 *
2211 * Returns the OID of the new pg_attrdef tuple.
2212 *
2213 * add_column_mode must be true if we are storing the default for a new
2214 * attribute, and false if it's for an already existing attribute. The reason
2215 * for this is that the missing value must never be updated after it is set,
2216 * which can only be when a column is added to the table. Otherwise we would
2217 * in effect be changing existing tuples.
2218 */
2219 Oid
StoreAttrDefault(Relation rel,AttrNumber attnum,Node * expr,bool is_internal,bool add_column_mode)2220 StoreAttrDefault(Relation rel, AttrNumber attnum,
2221 Node *expr, bool is_internal, bool add_column_mode)
2222 {
2223 char *adbin;
2224 Relation adrel;
2225 HeapTuple tuple;
2226 Datum values[4];
2227 static bool nulls[4] = {false, false, false, false};
2228 Relation attrrel;
2229 HeapTuple atttup;
2230 Form_pg_attribute attStruct;
2231 char attgenerated;
2232 Oid attrdefOid;
2233 ObjectAddress colobject,
2234 defobject;
2235
2236 adrel = table_open(AttrDefaultRelationId, RowExclusiveLock);
2237
2238 /*
2239 * Flatten expression to string form for storage.
2240 */
2241 adbin = nodeToString(expr);
2242
2243 /*
2244 * Make the pg_attrdef entry.
2245 */
2246 attrdefOid = GetNewOidWithIndex(adrel, AttrDefaultOidIndexId,
2247 Anum_pg_attrdef_oid);
2248 values[Anum_pg_attrdef_oid - 1] = ObjectIdGetDatum(attrdefOid);
2249 values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
2250 values[Anum_pg_attrdef_adnum - 1] = attnum;
2251 values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
2252
2253 tuple = heap_form_tuple(adrel->rd_att, values, nulls);
2254 CatalogTupleInsert(adrel, tuple);
2255
2256 defobject.classId = AttrDefaultRelationId;
2257 defobject.objectId = attrdefOid;
2258 defobject.objectSubId = 0;
2259
2260 table_close(adrel, RowExclusiveLock);
2261
2262 /* now can free some of the stuff allocated above */
2263 pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
2264 heap_freetuple(tuple);
2265 pfree(adbin);
2266
2267 /*
2268 * Update the pg_attribute entry for the column to show that a default
2269 * exists.
2270 */
2271 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2272 atttup = SearchSysCacheCopy2(ATTNUM,
2273 ObjectIdGetDatum(RelationGetRelid(rel)),
2274 Int16GetDatum(attnum));
2275 if (!HeapTupleIsValid(atttup))
2276 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2277 attnum, RelationGetRelid(rel));
2278 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2279 attgenerated = attStruct->attgenerated;
2280 if (!attStruct->atthasdef)
2281 {
2282 Form_pg_attribute defAttStruct;
2283
2284 ExprState *exprState;
2285 Expr *expr2 = (Expr *) expr;
2286 EState *estate = NULL;
2287 ExprContext *econtext;
2288 Datum valuesAtt[Natts_pg_attribute];
2289 bool nullsAtt[Natts_pg_attribute];
2290 bool replacesAtt[Natts_pg_attribute];
2291 Datum missingval = (Datum) 0;
2292 bool missingIsNull = true;
2293
2294 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2295 MemSet(nullsAtt, false, sizeof(nullsAtt));
2296 MemSet(replacesAtt, false, sizeof(replacesAtt));
2297 valuesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2298 replacesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2299
2300 if (rel->rd_rel->relkind == RELKIND_RELATION && add_column_mode &&
2301 !attgenerated)
2302 {
2303 expr2 = expression_planner(expr2);
2304 estate = CreateExecutorState();
2305 exprState = ExecPrepareExpr(expr2, estate);
2306 econtext = GetPerTupleExprContext(estate);
2307
2308 missingval = ExecEvalExpr(exprState, econtext,
2309 &missingIsNull);
2310
2311 FreeExecutorState(estate);
2312
2313 defAttStruct = TupleDescAttr(rel->rd_att, attnum - 1);
2314
2315 if (missingIsNull)
2316 {
2317 /* if the default evaluates to NULL, just store a NULL array */
2318 missingval = (Datum) 0;
2319 }
2320 else
2321 {
2322 /* otherwise make a one-element array of the value */
2323 missingval = PointerGetDatum(construct_array(&missingval,
2324 1,
2325 defAttStruct->atttypid,
2326 defAttStruct->attlen,
2327 defAttStruct->attbyval,
2328 defAttStruct->attalign));
2329 }
2330
2331 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = !missingIsNull;
2332 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2333 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2334 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2335 nullsAtt[Anum_pg_attribute_attmissingval - 1] = missingIsNull;
2336 }
2337 atttup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2338 valuesAtt, nullsAtt, replacesAtt);
2339
2340 CatalogTupleUpdate(attrrel, &atttup->t_self, atttup);
2341
2342 if (!missingIsNull)
2343 pfree(DatumGetPointer(missingval));
2344
2345 }
2346 table_close(attrrel, RowExclusiveLock);
2347 heap_freetuple(atttup);
2348
2349 /*
2350 * Make a dependency so that the pg_attrdef entry goes away if the column
2351 * (or whole table) is deleted.
2352 */
2353 colobject.classId = RelationRelationId;
2354 colobject.objectId = RelationGetRelid(rel);
2355 colobject.objectSubId = attnum;
2356
2357 recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
2358
2359 /*
2360 * Record dependencies on objects used in the expression, too.
2361 */
2362 if (attgenerated)
2363 {
2364 /*
2365 * Generated column: Dropping anything that the generation expression
2366 * refers to automatically drops the generated column.
2367 */
2368 recordDependencyOnSingleRelExpr(&colobject, expr, RelationGetRelid(rel),
2369 DEPENDENCY_AUTO,
2370 DEPENDENCY_AUTO, false);
2371 }
2372 else
2373 {
2374 /*
2375 * Normal default: Dropping anything that the default refers to
2376 * requires CASCADE and drops the default only.
2377 */
2378 recordDependencyOnSingleRelExpr(&defobject, expr, RelationGetRelid(rel),
2379 DEPENDENCY_NORMAL,
2380 DEPENDENCY_NORMAL, false);
2381 }
2382
2383 /*
2384 * Post creation hook for attribute defaults.
2385 *
2386 * XXX. ALTER TABLE ALTER COLUMN SET/DROP DEFAULT is implemented with a
2387 * couple of deletion/creation of the attribute's default entry, so the
2388 * callee should check existence of an older version of this entry if it
2389 * needs to distinguish.
2390 */
2391 InvokeObjectPostCreateHookArg(AttrDefaultRelationId,
2392 RelationGetRelid(rel), attnum, is_internal);
2393
2394 return attrdefOid;
2395 }
2396
2397 /*
2398 * Store a check-constraint expression for the given relation.
2399 *
2400 * Caller is responsible for updating the count of constraints
2401 * in the pg_class entry for the relation.
2402 *
2403 * The OID of the new constraint is returned.
2404 */
2405 static Oid
StoreRelCheck(Relation rel,const char * ccname,Node * expr,bool is_validated,bool is_local,int inhcount,bool is_no_inherit,bool is_internal)2406 StoreRelCheck(Relation rel, const char *ccname, Node *expr,
2407 bool is_validated, bool is_local, int inhcount,
2408 bool is_no_inherit, bool is_internal)
2409 {
2410 char *ccbin;
2411 List *varList;
2412 int keycount;
2413 int16 *attNos;
2414 Oid constrOid;
2415
2416 /*
2417 * Flatten expression to string form for storage.
2418 */
2419 ccbin = nodeToString(expr);
2420
2421 /*
2422 * Find columns of rel that are used in expr
2423 *
2424 * NB: pull_var_clause is okay here only because we don't allow subselects
2425 * in check constraints; it would fail to examine the contents of
2426 * subselects.
2427 */
2428 varList = pull_var_clause(expr, 0);
2429 keycount = list_length(varList);
2430
2431 if (keycount > 0)
2432 {
2433 ListCell *vl;
2434 int i = 0;
2435
2436 attNos = (int16 *) palloc(keycount * sizeof(int16));
2437 foreach(vl, varList)
2438 {
2439 Var *var = (Var *) lfirst(vl);
2440 int j;
2441
2442 for (j = 0; j < i; j++)
2443 if (attNos[j] == var->varattno)
2444 break;
2445 if (j == i)
2446 attNos[i++] = var->varattno;
2447 }
2448 keycount = i;
2449 }
2450 else
2451 attNos = NULL;
2452
2453 /*
2454 * Partitioned tables do not contain any rows themselves, so a NO INHERIT
2455 * constraint makes no sense.
2456 */
2457 if (is_no_inherit &&
2458 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2459 ereport(ERROR,
2460 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2461 errmsg("cannot add NO INHERIT constraint to partitioned table \"%s\"",
2462 RelationGetRelationName(rel))));
2463
2464 /*
2465 * Create the Check Constraint
2466 */
2467 constrOid =
2468 CreateConstraintEntry(ccname, /* Constraint Name */
2469 RelationGetNamespace(rel), /* namespace */
2470 CONSTRAINT_CHECK, /* Constraint Type */
2471 false, /* Is Deferrable */
2472 false, /* Is Deferred */
2473 is_validated,
2474 InvalidOid, /* no parent constraint */
2475 RelationGetRelid(rel), /* relation */
2476 attNos, /* attrs in the constraint */
2477 keycount, /* # key attrs in the constraint */
2478 keycount, /* # total attrs in the constraint */
2479 InvalidOid, /* not a domain constraint */
2480 InvalidOid, /* no associated index */
2481 InvalidOid, /* Foreign key fields */
2482 NULL,
2483 NULL,
2484 NULL,
2485 NULL,
2486 0,
2487 ' ',
2488 ' ',
2489 ' ',
2490 NULL, /* not an exclusion constraint */
2491 expr, /* Tree form of check constraint */
2492 ccbin, /* Binary form of check constraint */
2493 is_local, /* conislocal */
2494 inhcount, /* coninhcount */
2495 is_no_inherit, /* connoinherit */
2496 is_internal); /* internally constructed? */
2497
2498 pfree(ccbin);
2499
2500 return constrOid;
2501 }
2502
2503 /*
2504 * Store defaults and constraints (passed as a list of CookedConstraint).
2505 *
2506 * Each CookedConstraint struct is modified to store the new catalog tuple OID.
2507 *
2508 * NOTE: only pre-cooked expressions will be passed this way, which is to
2509 * say expressions inherited from an existing relation. Newly parsed
2510 * expressions can be added later, by direct calls to StoreAttrDefault
2511 * and StoreRelCheck (see AddRelationNewConstraints()).
2512 */
2513 static void
StoreConstraints(Relation rel,List * cooked_constraints,bool is_internal)2514 StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
2515 {
2516 int numchecks = 0;
2517 ListCell *lc;
2518
2519 if (cooked_constraints == NIL)
2520 return; /* nothing to do */
2521
2522 /*
2523 * Deparsing of constraint expressions will fail unless the just-created
2524 * pg_attribute tuples for this relation are made visible. So, bump the
2525 * command counter. CAUTION: this will cause a relcache entry rebuild.
2526 */
2527 CommandCounterIncrement();
2528
2529 foreach(lc, cooked_constraints)
2530 {
2531 CookedConstraint *con = (CookedConstraint *) lfirst(lc);
2532
2533 switch (con->contype)
2534 {
2535 case CONSTR_DEFAULT:
2536 con->conoid = StoreAttrDefault(rel, con->attnum, con->expr,
2537 is_internal, false);
2538 break;
2539 case CONSTR_CHECK:
2540 con->conoid =
2541 StoreRelCheck(rel, con->name, con->expr,
2542 !con->skip_validation, con->is_local,
2543 con->inhcount, con->is_no_inherit,
2544 is_internal);
2545 numchecks++;
2546 break;
2547 default:
2548 elog(ERROR, "unrecognized constraint type: %d",
2549 (int) con->contype);
2550 }
2551 }
2552
2553 if (numchecks > 0)
2554 SetRelationNumChecks(rel, numchecks);
2555 }
2556
2557 /*
2558 * AddRelationNewConstraints
2559 *
2560 * Add new column default expressions and/or constraint check expressions
2561 * to an existing relation. This is defined to do both for efficiency in
2562 * DefineRelation, but of course you can do just one or the other by passing
2563 * empty lists.
2564 *
2565 * rel: relation to be modified
2566 * newColDefaults: list of RawColumnDefault structures
2567 * newConstraints: list of Constraint nodes
2568 * allow_merge: true if check constraints may be merged with existing ones
2569 * is_local: true if definition is local, false if it's inherited
2570 * is_internal: true if result of some internal process, not a user request
2571 *
2572 * All entries in newColDefaults will be processed. Entries in newConstraints
2573 * will be processed only if they are CONSTR_CHECK type.
2574 *
2575 * Returns a list of CookedConstraint nodes that shows the cooked form of
2576 * the default and constraint expressions added to the relation.
2577 *
2578 * NB: caller should have opened rel with some self-conflicting lock mode,
2579 * and should hold that lock till end of transaction; for normal cases that'll
2580 * be AccessExclusiveLock, but if caller knows that the constraint is already
2581 * enforced by some other means, it can be ShareUpdateExclusiveLock. Also, we
2582 * assume the caller has done a CommandCounterIncrement if necessary to make
2583 * the relation's catalog tuples visible.
2584 */
2585 List *
AddRelationNewConstraints(Relation rel,List * newColDefaults,List * newConstraints,bool allow_merge,bool is_local,bool is_internal,const char * queryString)2586 AddRelationNewConstraints(Relation rel,
2587 List *newColDefaults,
2588 List *newConstraints,
2589 bool allow_merge,
2590 bool is_local,
2591 bool is_internal,
2592 const char *queryString)
2593 {
2594 List *cookedConstraints = NIL;
2595 TupleDesc tupleDesc;
2596 TupleConstr *oldconstr;
2597 int numoldchecks;
2598 ParseState *pstate;
2599 ParseNamespaceItem *nsitem;
2600 int numchecks;
2601 List *checknames;
2602 ListCell *cell;
2603 Node *expr;
2604 CookedConstraint *cooked;
2605
2606 /*
2607 * Get info about existing constraints.
2608 */
2609 tupleDesc = RelationGetDescr(rel);
2610 oldconstr = tupleDesc->constr;
2611 if (oldconstr)
2612 numoldchecks = oldconstr->num_check;
2613 else
2614 numoldchecks = 0;
2615
2616 /*
2617 * Create a dummy ParseState and insert the target relation as its sole
2618 * rangetable entry. We need a ParseState for transformExpr.
2619 */
2620 pstate = make_parsestate(NULL);
2621 pstate->p_sourcetext = queryString;
2622 nsitem = addRangeTableEntryForRelation(pstate,
2623 rel,
2624 AccessShareLock,
2625 NULL,
2626 false,
2627 true);
2628 addNSItemToQuery(pstate, nsitem, true, true, true);
2629
2630 /*
2631 * Process column default expressions.
2632 */
2633 foreach(cell, newColDefaults)
2634 {
2635 RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2636 Form_pg_attribute atp = TupleDescAttr(rel->rd_att, colDef->attnum - 1);
2637 Oid defOid;
2638
2639 expr = cookDefault(pstate, colDef->raw_default,
2640 atp->atttypid, atp->atttypmod,
2641 NameStr(atp->attname),
2642 atp->attgenerated);
2643
2644 /*
2645 * If the expression is just a NULL constant, we do not bother to make
2646 * an explicit pg_attrdef entry, since the default behavior is
2647 * equivalent. This applies to column defaults, but not for
2648 * generation expressions.
2649 *
2650 * Note a nonobvious property of this test: if the column is of a
2651 * domain type, what we'll get is not a bare null Const but a
2652 * CoerceToDomain expr, so we will not discard the default. This is
2653 * critical because the column default needs to be retained to
2654 * override any default that the domain might have.
2655 */
2656 if (expr == NULL ||
2657 (!colDef->generated &&
2658 IsA(expr, Const) &&
2659 castNode(Const, expr)->constisnull))
2660 continue;
2661
2662 /* If the DEFAULT is volatile we cannot use a missing value */
2663 if (colDef->missingMode && contain_volatile_functions((Node *) expr))
2664 colDef->missingMode = false;
2665
2666 defOid = StoreAttrDefault(rel, colDef->attnum, expr, is_internal,
2667 colDef->missingMode);
2668
2669 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2670 cooked->contype = CONSTR_DEFAULT;
2671 cooked->conoid = defOid;
2672 cooked->name = NULL;
2673 cooked->attnum = colDef->attnum;
2674 cooked->expr = expr;
2675 cooked->skip_validation = false;
2676 cooked->is_local = is_local;
2677 cooked->inhcount = is_local ? 0 : 1;
2678 cooked->is_no_inherit = false;
2679 cookedConstraints = lappend(cookedConstraints, cooked);
2680 }
2681
2682 /*
2683 * Process constraint expressions.
2684 */
2685 numchecks = numoldchecks;
2686 checknames = NIL;
2687 foreach(cell, newConstraints)
2688 {
2689 Constraint *cdef = (Constraint *) lfirst(cell);
2690 char *ccname;
2691 Oid constrOid;
2692
2693 if (cdef->contype != CONSTR_CHECK)
2694 continue;
2695
2696 if (cdef->raw_expr != NULL)
2697 {
2698 Assert(cdef->cooked_expr == NULL);
2699
2700 /*
2701 * Transform raw parsetree to executable expression, and verify
2702 * it's valid as a CHECK constraint.
2703 */
2704 expr = cookConstraint(pstate, cdef->raw_expr,
2705 RelationGetRelationName(rel));
2706 }
2707 else
2708 {
2709 Assert(cdef->cooked_expr != NULL);
2710
2711 /*
2712 * Here, we assume the parser will only pass us valid CHECK
2713 * expressions, so we do no particular checking.
2714 */
2715 expr = stringToNode(cdef->cooked_expr);
2716 }
2717
2718 /*
2719 * Check name uniqueness, or generate a name if none was given.
2720 */
2721 if (cdef->conname != NULL)
2722 {
2723 ListCell *cell2;
2724
2725 ccname = cdef->conname;
2726 /* Check against other new constraints */
2727 /* Needed because we don't do CommandCounterIncrement in loop */
2728 foreach(cell2, checknames)
2729 {
2730 if (strcmp((char *) lfirst(cell2), ccname) == 0)
2731 ereport(ERROR,
2732 (errcode(ERRCODE_DUPLICATE_OBJECT),
2733 errmsg("check constraint \"%s\" already exists",
2734 ccname)));
2735 }
2736
2737 /* save name for future checks */
2738 checknames = lappend(checknames, ccname);
2739
2740 /*
2741 * Check against pre-existing constraints. If we are allowed to
2742 * merge with an existing constraint, there's no more to do here.
2743 * (We omit the duplicate constraint from the result, which is
2744 * what ATAddCheckConstraint wants.)
2745 */
2746 if (MergeWithExistingConstraint(rel, ccname, expr,
2747 allow_merge, is_local,
2748 cdef->initially_valid,
2749 cdef->is_no_inherit))
2750 continue;
2751 }
2752 else
2753 {
2754 /*
2755 * When generating a name, we want to create "tab_col_check" for a
2756 * column constraint and "tab_check" for a table constraint. We
2757 * no longer have any info about the syntactic positioning of the
2758 * constraint phrase, so we approximate this by seeing whether the
2759 * expression references more than one column. (If the user
2760 * played by the rules, the result is the same...)
2761 *
2762 * Note: pull_var_clause() doesn't descend into sublinks, but we
2763 * eliminated those above; and anyway this only needs to be an
2764 * approximate answer.
2765 */
2766 List *vars;
2767 char *colname;
2768
2769 vars = pull_var_clause(expr, 0);
2770
2771 /* eliminate duplicates */
2772 vars = list_union(NIL, vars);
2773
2774 if (list_length(vars) == 1)
2775 colname = get_attname(RelationGetRelid(rel),
2776 ((Var *) linitial(vars))->varattno,
2777 true);
2778 else
2779 colname = NULL;
2780
2781 ccname = ChooseConstraintName(RelationGetRelationName(rel),
2782 colname,
2783 "check",
2784 RelationGetNamespace(rel),
2785 checknames);
2786
2787 /* save name for future checks */
2788 checknames = lappend(checknames, ccname);
2789 }
2790
2791 /*
2792 * OK, store it.
2793 */
2794 constrOid =
2795 StoreRelCheck(rel, ccname, expr, cdef->initially_valid, is_local,
2796 is_local ? 0 : 1, cdef->is_no_inherit, is_internal);
2797
2798 numchecks++;
2799
2800 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2801 cooked->contype = CONSTR_CHECK;
2802 cooked->conoid = constrOid;
2803 cooked->name = ccname;
2804 cooked->attnum = 0;
2805 cooked->expr = expr;
2806 cooked->skip_validation = cdef->skip_validation;
2807 cooked->is_local = is_local;
2808 cooked->inhcount = is_local ? 0 : 1;
2809 cooked->is_no_inherit = cdef->is_no_inherit;
2810 cookedConstraints = lappend(cookedConstraints, cooked);
2811 }
2812
2813 /*
2814 * Update the count of constraints in the relation's pg_class tuple. We do
2815 * this even if there was no change, in order to ensure that an SI update
2816 * message is sent out for the pg_class tuple, which will force other
2817 * backends to rebuild their relcache entries for the rel. (This is
2818 * critical if we added defaults but not constraints.)
2819 */
2820 SetRelationNumChecks(rel, numchecks);
2821
2822 return cookedConstraints;
2823 }
2824
2825 /*
2826 * Check for a pre-existing check constraint that conflicts with a proposed
2827 * new one, and either adjust its conislocal/coninhcount settings or throw
2828 * error as needed.
2829 *
2830 * Returns true if merged (constraint is a duplicate), or false if it's
2831 * got a so-far-unique name, or throws error if conflict.
2832 *
2833 * XXX See MergeConstraintsIntoExisting too if you change this code.
2834 */
2835 static bool
MergeWithExistingConstraint(Relation rel,const char * ccname,Node * expr,bool allow_merge,bool is_local,bool is_initially_valid,bool is_no_inherit)2836 MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
2837 bool allow_merge, bool is_local,
2838 bool is_initially_valid,
2839 bool is_no_inherit)
2840 {
2841 bool found;
2842 Relation conDesc;
2843 SysScanDesc conscan;
2844 ScanKeyData skey[3];
2845 HeapTuple tup;
2846
2847 /* Search for a pg_constraint entry with same name and relation */
2848 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
2849
2850 found = false;
2851
2852 ScanKeyInit(&skey[0],
2853 Anum_pg_constraint_conrelid,
2854 BTEqualStrategyNumber, F_OIDEQ,
2855 ObjectIdGetDatum(RelationGetRelid(rel)));
2856 ScanKeyInit(&skey[1],
2857 Anum_pg_constraint_contypid,
2858 BTEqualStrategyNumber, F_OIDEQ,
2859 ObjectIdGetDatum(InvalidOid));
2860 ScanKeyInit(&skey[2],
2861 Anum_pg_constraint_conname,
2862 BTEqualStrategyNumber, F_NAMEEQ,
2863 CStringGetDatum(ccname));
2864
2865 conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId, true,
2866 NULL, 3, skey);
2867
2868 /* There can be at most one matching row */
2869 if (HeapTupleIsValid(tup = systable_getnext(conscan)))
2870 {
2871 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2872
2873 /* Found it. Conflicts if not identical check constraint */
2874 if (con->contype == CONSTRAINT_CHECK)
2875 {
2876 Datum val;
2877 bool isnull;
2878
2879 val = fastgetattr(tup,
2880 Anum_pg_constraint_conbin,
2881 conDesc->rd_att, &isnull);
2882 if (isnull)
2883 elog(ERROR, "null conbin for rel %s",
2884 RelationGetRelationName(rel));
2885 if (equal(expr, stringToNode(TextDatumGetCString(val))))
2886 found = true;
2887 }
2888
2889 /*
2890 * If the existing constraint is purely inherited (no local
2891 * definition) then interpret addition of a local constraint as a
2892 * legal merge. This allows ALTER ADD CONSTRAINT on parent and child
2893 * tables to be given in either order with same end state. However if
2894 * the relation is a partition, all inherited constraints are always
2895 * non-local, including those that were merged.
2896 */
2897 if (is_local && !con->conislocal && !rel->rd_rel->relispartition)
2898 allow_merge = true;
2899
2900 if (!found || !allow_merge)
2901 ereport(ERROR,
2902 (errcode(ERRCODE_DUPLICATE_OBJECT),
2903 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2904 ccname, RelationGetRelationName(rel))));
2905
2906 /* If the child constraint is "no inherit" then cannot merge */
2907 if (con->connoinherit)
2908 ereport(ERROR,
2909 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2910 errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
2911 ccname, RelationGetRelationName(rel))));
2912
2913 /*
2914 * Must not change an existing inherited constraint to "no inherit"
2915 * status. That's because inherited constraints should be able to
2916 * propagate to lower-level children.
2917 */
2918 if (con->coninhcount > 0 && is_no_inherit)
2919 ereport(ERROR,
2920 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2921 errmsg("constraint \"%s\" conflicts with inherited constraint on relation \"%s\"",
2922 ccname, RelationGetRelationName(rel))));
2923
2924 /*
2925 * If the child constraint is "not valid" then cannot merge with a
2926 * valid parent constraint.
2927 */
2928 if (is_initially_valid && !con->convalidated)
2929 ereport(ERROR,
2930 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2931 errmsg("constraint \"%s\" conflicts with NOT VALID constraint on relation \"%s\"",
2932 ccname, RelationGetRelationName(rel))));
2933
2934 /* OK to update the tuple */
2935 ereport(NOTICE,
2936 (errmsg("merging constraint \"%s\" with inherited definition",
2937 ccname)));
2938
2939 tup = heap_copytuple(tup);
2940 con = (Form_pg_constraint) GETSTRUCT(tup);
2941
2942 /*
2943 * In case of partitions, an inherited constraint must be inherited
2944 * only once since it cannot have multiple parents and it is never
2945 * considered local.
2946 */
2947 if (rel->rd_rel->relispartition)
2948 {
2949 con->coninhcount = 1;
2950 con->conislocal = false;
2951 }
2952 else
2953 {
2954 if (is_local)
2955 con->conislocal = true;
2956 else
2957 con->coninhcount++;
2958 }
2959
2960 if (is_no_inherit)
2961 {
2962 Assert(is_local);
2963 con->connoinherit = true;
2964 }
2965
2966 CatalogTupleUpdate(conDesc, &tup->t_self, tup);
2967 }
2968
2969 systable_endscan(conscan);
2970 table_close(conDesc, RowExclusiveLock);
2971
2972 return found;
2973 }
2974
2975 /*
2976 * Update the count of constraints in the relation's pg_class tuple.
2977 *
2978 * Caller had better hold exclusive lock on the relation.
2979 *
2980 * An important side effect is that a SI update message will be sent out for
2981 * the pg_class tuple, which will force other backends to rebuild their
2982 * relcache entries for the rel. Also, this backend will rebuild its
2983 * own relcache entry at the next CommandCounterIncrement.
2984 */
2985 static void
SetRelationNumChecks(Relation rel,int numchecks)2986 SetRelationNumChecks(Relation rel, int numchecks)
2987 {
2988 Relation relrel;
2989 HeapTuple reltup;
2990 Form_pg_class relStruct;
2991
2992 relrel = table_open(RelationRelationId, RowExclusiveLock);
2993 reltup = SearchSysCacheCopy1(RELOID,
2994 ObjectIdGetDatum(RelationGetRelid(rel)));
2995 if (!HeapTupleIsValid(reltup))
2996 elog(ERROR, "cache lookup failed for relation %u",
2997 RelationGetRelid(rel));
2998 relStruct = (Form_pg_class) GETSTRUCT(reltup);
2999
3000 if (relStruct->relchecks != numchecks)
3001 {
3002 relStruct->relchecks = numchecks;
3003
3004 CatalogTupleUpdate(relrel, &reltup->t_self, reltup);
3005 }
3006 else
3007 {
3008 /* Skip the disk update, but force relcache inval anyway */
3009 CacheInvalidateRelcache(rel);
3010 }
3011
3012 heap_freetuple(reltup);
3013 table_close(relrel, RowExclusiveLock);
3014 }
3015
3016 /*
3017 * Check for references to generated columns
3018 */
3019 static bool
check_nested_generated_walker(Node * node,void * context)3020 check_nested_generated_walker(Node *node, void *context)
3021 {
3022 ParseState *pstate = context;
3023
3024 if (node == NULL)
3025 return false;
3026 else if (IsA(node, Var))
3027 {
3028 Var *var = (Var *) node;
3029 Oid relid;
3030 AttrNumber attnum;
3031
3032 relid = rt_fetch(var->varno, pstate->p_rtable)->relid;
3033 if (!OidIsValid(relid))
3034 return false; /* XXX shouldn't we raise an error? */
3035
3036 attnum = var->varattno;
3037
3038 if (attnum > 0 && get_attgenerated(relid, attnum))
3039 ereport(ERROR,
3040 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3041 errmsg("cannot use generated column \"%s\" in column generation expression",
3042 get_attname(relid, attnum, false)),
3043 errdetail("A generated column cannot reference another generated column."),
3044 parser_errposition(pstate, var->location)));
3045 /* A whole-row Var is necessarily self-referential, so forbid it */
3046 if (attnum == 0)
3047 ereport(ERROR,
3048 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3049 errmsg("cannot use whole-row variable in column generation expression"),
3050 errdetail("This would cause the generated column to depend on its own value."),
3051 parser_errposition(pstate, var->location)));
3052 /* System columns were already checked in the parser */
3053
3054 return false;
3055 }
3056 else
3057 return expression_tree_walker(node, check_nested_generated_walker,
3058 (void *) context);
3059 }
3060
3061 static void
check_nested_generated(ParseState * pstate,Node * node)3062 check_nested_generated(ParseState *pstate, Node *node)
3063 {
3064 check_nested_generated_walker(node, pstate);
3065 }
3066
3067 /*
3068 * Take a raw default and convert it to a cooked format ready for
3069 * storage.
3070 *
3071 * Parse state should be set up to recognize any vars that might appear
3072 * in the expression. (Even though we plan to reject vars, it's more
3073 * user-friendly to give the correct error message than "unknown var".)
3074 *
3075 * If atttypid is not InvalidOid, coerce the expression to the specified
3076 * type (and typmod atttypmod). attname is only needed in this case:
3077 * it is used in the error message, if any.
3078 */
3079 Node *
cookDefault(ParseState * pstate,Node * raw_default,Oid atttypid,int32 atttypmod,const char * attname,char attgenerated)3080 cookDefault(ParseState *pstate,
3081 Node *raw_default,
3082 Oid atttypid,
3083 int32 atttypmod,
3084 const char *attname,
3085 char attgenerated)
3086 {
3087 Node *expr;
3088
3089 Assert(raw_default != NULL);
3090
3091 /*
3092 * Transform raw parsetree to executable expression.
3093 */
3094 expr = transformExpr(pstate, raw_default, attgenerated ? EXPR_KIND_GENERATED_COLUMN : EXPR_KIND_COLUMN_DEFAULT);
3095
3096 if (attgenerated)
3097 {
3098 check_nested_generated(pstate, expr);
3099
3100 if (contain_mutable_functions(expr))
3101 ereport(ERROR,
3102 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3103 errmsg("generation expression is not immutable")));
3104 }
3105 else
3106 {
3107 /*
3108 * For a default expression, transformExpr() should have rejected
3109 * column references.
3110 */
3111 Assert(!contain_var_clause(expr));
3112 }
3113
3114 /*
3115 * Coerce the expression to the correct type and typmod, if given. This
3116 * should match the parser's processing of non-defaulted expressions ---
3117 * see transformAssignedExpr().
3118 */
3119 if (OidIsValid(atttypid))
3120 {
3121 Oid type_id = exprType(expr);
3122
3123 expr = coerce_to_target_type(pstate, expr, type_id,
3124 atttypid, atttypmod,
3125 COERCION_ASSIGNMENT,
3126 COERCE_IMPLICIT_CAST,
3127 -1);
3128 if (expr == NULL)
3129 ereport(ERROR,
3130 (errcode(ERRCODE_DATATYPE_MISMATCH),
3131 errmsg("column \"%s\" is of type %s"
3132 " but default expression is of type %s",
3133 attname,
3134 format_type_be(atttypid),
3135 format_type_be(type_id)),
3136 errhint("You will need to rewrite or cast the expression.")));
3137 }
3138
3139 /*
3140 * Finally, take care of collations in the finished expression.
3141 */
3142 assign_expr_collations(pstate, expr);
3143
3144 return expr;
3145 }
3146
3147 /*
3148 * Take a raw CHECK constraint expression and convert it to a cooked format
3149 * ready for storage.
3150 *
3151 * Parse state must be set up to recognize any vars that might appear
3152 * in the expression.
3153 */
3154 static Node *
cookConstraint(ParseState * pstate,Node * raw_constraint,char * relname)3155 cookConstraint(ParseState *pstate,
3156 Node *raw_constraint,
3157 char *relname)
3158 {
3159 Node *expr;
3160
3161 /*
3162 * Transform raw parsetree to executable expression.
3163 */
3164 expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
3165
3166 /*
3167 * Make sure it yields a boolean result.
3168 */
3169 expr = coerce_to_boolean(pstate, expr, "CHECK");
3170
3171 /*
3172 * Take care of collations.
3173 */
3174 assign_expr_collations(pstate, expr);
3175
3176 /*
3177 * Make sure no outside relations are referred to (this is probably dead
3178 * code now that add_missing_from is history).
3179 */
3180 if (list_length(pstate->p_rtable) != 1)
3181 ereport(ERROR,
3182 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
3183 errmsg("only table \"%s\" can be referenced in check constraint",
3184 relname)));
3185
3186 return expr;
3187 }
3188
3189 /*
3190 * CopyStatistics --- copy entries in pg_statistic from one rel to another
3191 */
3192 void
CopyStatistics(Oid fromrelid,Oid torelid)3193 CopyStatistics(Oid fromrelid, Oid torelid)
3194 {
3195 HeapTuple tup;
3196 SysScanDesc scan;
3197 ScanKeyData key[1];
3198 Relation statrel;
3199
3200 statrel = table_open(StatisticRelationId, RowExclusiveLock);
3201
3202 /* Now search for stat records */
3203 ScanKeyInit(&key[0],
3204 Anum_pg_statistic_starelid,
3205 BTEqualStrategyNumber, F_OIDEQ,
3206 ObjectIdGetDatum(fromrelid));
3207
3208 scan = systable_beginscan(statrel, StatisticRelidAttnumInhIndexId,
3209 true, NULL, 1, key);
3210
3211 while (HeapTupleIsValid((tup = systable_getnext(scan))))
3212 {
3213 Form_pg_statistic statform;
3214
3215 /* make a modifiable copy */
3216 tup = heap_copytuple(tup);
3217 statform = (Form_pg_statistic) GETSTRUCT(tup);
3218
3219 /* update the copy of the tuple and insert it */
3220 statform->starelid = torelid;
3221 CatalogTupleInsert(statrel, tup);
3222
3223 heap_freetuple(tup);
3224 }
3225
3226 systable_endscan(scan);
3227
3228 table_close(statrel, RowExclusiveLock);
3229 }
3230
3231 /*
3232 * RemoveStatistics --- remove entries in pg_statistic for a rel or column
3233 *
3234 * If attnum is zero, remove all entries for rel; else remove only the one(s)
3235 * for that column.
3236 */
3237 void
RemoveStatistics(Oid relid,AttrNumber attnum)3238 RemoveStatistics(Oid relid, AttrNumber attnum)
3239 {
3240 Relation pgstatistic;
3241 SysScanDesc scan;
3242 ScanKeyData key[2];
3243 int nkeys;
3244 HeapTuple tuple;
3245
3246 pgstatistic = table_open(StatisticRelationId, RowExclusiveLock);
3247
3248 ScanKeyInit(&key[0],
3249 Anum_pg_statistic_starelid,
3250 BTEqualStrategyNumber, F_OIDEQ,
3251 ObjectIdGetDatum(relid));
3252
3253 if (attnum == 0)
3254 nkeys = 1;
3255 else
3256 {
3257 ScanKeyInit(&key[1],
3258 Anum_pg_statistic_staattnum,
3259 BTEqualStrategyNumber, F_INT2EQ,
3260 Int16GetDatum(attnum));
3261 nkeys = 2;
3262 }
3263
3264 scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
3265 NULL, nkeys, key);
3266
3267 /* we must loop even when attnum != 0, in case of inherited stats */
3268 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
3269 CatalogTupleDelete(pgstatistic, &tuple->t_self);
3270
3271 systable_endscan(scan);
3272
3273 table_close(pgstatistic, RowExclusiveLock);
3274 }
3275
3276
3277 /*
3278 * RelationTruncateIndexes - truncate all indexes associated
3279 * with the heap relation to zero tuples.
3280 *
3281 * The routine will truncate and then reconstruct the indexes on
3282 * the specified relation. Caller must hold exclusive lock on rel.
3283 */
3284 static void
RelationTruncateIndexes(Relation heapRelation)3285 RelationTruncateIndexes(Relation heapRelation)
3286 {
3287 ListCell *indlist;
3288
3289 /* Ask the relcache to produce a list of the indexes of the rel */
3290 foreach(indlist, RelationGetIndexList(heapRelation))
3291 {
3292 Oid indexId = lfirst_oid(indlist);
3293 Relation currentIndex;
3294 IndexInfo *indexInfo;
3295
3296 /* Open the index relation; use exclusive lock, just to be sure */
3297 currentIndex = index_open(indexId, AccessExclusiveLock);
3298
3299 /*
3300 * Fetch info needed for index_build. Since we know there are no
3301 * tuples that actually need indexing, we can use a dummy IndexInfo.
3302 * This is slightly cheaper to build, but the real point is to avoid
3303 * possibly running user-defined code in index expressions or
3304 * predicates. We might be getting invoked during ON COMMIT
3305 * processing, and we don't want to run any such code then.
3306 */
3307 indexInfo = BuildDummyIndexInfo(currentIndex);
3308
3309 /*
3310 * Now truncate the actual file (and discard buffers).
3311 */
3312 RelationTruncate(currentIndex, 0);
3313
3314 /* Initialize the index and rebuild */
3315 /* Note: we do not need to re-establish pkey setting */
3316 index_build(heapRelation, currentIndex, indexInfo, true, false);
3317
3318 /* We're done with this index */
3319 index_close(currentIndex, NoLock);
3320 }
3321 }
3322
3323 /*
3324 * heap_truncate
3325 *
3326 * This routine deletes all data within all the specified relations.
3327 *
3328 * This is not transaction-safe! There is another, transaction-safe
3329 * implementation in commands/tablecmds.c. We now use this only for
3330 * ON COMMIT truncation of temporary tables, where it doesn't matter.
3331 */
3332 void
heap_truncate(List * relids)3333 heap_truncate(List *relids)
3334 {
3335 List *relations = NIL;
3336 ListCell *cell;
3337
3338 /* Open relations for processing, and grab exclusive access on each */
3339 foreach(cell, relids)
3340 {
3341 Oid rid = lfirst_oid(cell);
3342 Relation rel;
3343
3344 rel = table_open(rid, AccessExclusiveLock);
3345 relations = lappend(relations, rel);
3346 }
3347
3348 /* Don't allow truncate on tables that are referenced by foreign keys */
3349 heap_truncate_check_FKs(relations, true);
3350
3351 /* OK to do it */
3352 foreach(cell, relations)
3353 {
3354 Relation rel = lfirst(cell);
3355
3356 /* Truncate the relation */
3357 heap_truncate_one_rel(rel);
3358
3359 /* Close the relation, but keep exclusive lock on it until commit */
3360 table_close(rel, NoLock);
3361 }
3362 }
3363
3364 /*
3365 * heap_truncate_one_rel
3366 *
3367 * This routine deletes all data within the specified relation.
3368 *
3369 * This is not transaction-safe, because the truncation is done immediately
3370 * and cannot be rolled back later. Caller is responsible for having
3371 * checked permissions etc, and must have obtained AccessExclusiveLock.
3372 */
3373 void
heap_truncate_one_rel(Relation rel)3374 heap_truncate_one_rel(Relation rel)
3375 {
3376 Oid toastrelid;
3377
3378 /*
3379 * Truncate the relation. Partitioned tables have no storage, so there is
3380 * nothing to do for them here.
3381 */
3382 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3383 return;
3384
3385 /* Truncate the underlying relation */
3386 table_relation_nontransactional_truncate(rel);
3387
3388 /* If the relation has indexes, truncate the indexes too */
3389 RelationTruncateIndexes(rel);
3390
3391 /* If there is a toast table, truncate that too */
3392 toastrelid = rel->rd_rel->reltoastrelid;
3393 if (OidIsValid(toastrelid))
3394 {
3395 Relation toastrel = table_open(toastrelid, AccessExclusiveLock);
3396
3397 table_relation_nontransactional_truncate(toastrel);
3398 RelationTruncateIndexes(toastrel);
3399 /* keep the lock... */
3400 table_close(toastrel, NoLock);
3401 }
3402 }
3403
3404 /*
3405 * heap_truncate_check_FKs
3406 * Check for foreign keys referencing a list of relations that
3407 * are to be truncated, and raise error if there are any
3408 *
3409 * We disallow such FKs (except self-referential ones) since the whole point
3410 * of TRUNCATE is to not scan the individual rows to be thrown away.
3411 *
3412 * This is split out so it can be shared by both implementations of truncate.
3413 * Caller should already hold a suitable lock on the relations.
3414 *
3415 * tempTables is only used to select an appropriate error message.
3416 */
3417 void
heap_truncate_check_FKs(List * relations,bool tempTables)3418 heap_truncate_check_FKs(List *relations, bool tempTables)
3419 {
3420 List *oids = NIL;
3421 List *dependents;
3422 ListCell *cell;
3423
3424 /*
3425 * Build a list of OIDs of the interesting relations.
3426 *
3427 * If a relation has no triggers, then it can neither have FKs nor be
3428 * referenced by a FK from another table, so we can ignore it. For
3429 * partitioned tables, FKs have no triggers, so we must include them
3430 * anyway.
3431 */
3432 foreach(cell, relations)
3433 {
3434 Relation rel = lfirst(cell);
3435
3436 if (rel->rd_rel->relhastriggers ||
3437 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3438 oids = lappend_oid(oids, RelationGetRelid(rel));
3439 }
3440
3441 /*
3442 * Fast path: if no relation has triggers, none has FKs either.
3443 */
3444 if (oids == NIL)
3445 return;
3446
3447 /*
3448 * Otherwise, must scan pg_constraint. We make one pass with all the
3449 * relations considered; if this finds nothing, then all is well.
3450 */
3451 dependents = heap_truncate_find_FKs(oids);
3452 if (dependents == NIL)
3453 return;
3454
3455 /*
3456 * Otherwise we repeat the scan once per relation to identify a particular
3457 * pair of relations to complain about. This is pretty slow, but
3458 * performance shouldn't matter much in a failure path. The reason for
3459 * doing things this way is to ensure that the message produced is not
3460 * dependent on chance row locations within pg_constraint.
3461 */
3462 foreach(cell, oids)
3463 {
3464 Oid relid = lfirst_oid(cell);
3465 ListCell *cell2;
3466
3467 dependents = heap_truncate_find_FKs(list_make1_oid(relid));
3468
3469 foreach(cell2, dependents)
3470 {
3471 Oid relid2 = lfirst_oid(cell2);
3472
3473 if (!list_member_oid(oids, relid2))
3474 {
3475 char *relname = get_rel_name(relid);
3476 char *relname2 = get_rel_name(relid2);
3477
3478 if (tempTables)
3479 ereport(ERROR,
3480 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3481 errmsg("unsupported ON COMMIT and foreign key combination"),
3482 errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
3483 relname2, relname)));
3484 else
3485 ereport(ERROR,
3486 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3487 errmsg("cannot truncate a table referenced in a foreign key constraint"),
3488 errdetail("Table \"%s\" references \"%s\".",
3489 relname2, relname),
3490 errhint("Truncate table \"%s\" at the same time, "
3491 "or use TRUNCATE ... CASCADE.",
3492 relname2)));
3493 }
3494 }
3495 }
3496 }
3497
3498 /*
3499 * heap_truncate_find_FKs
3500 * Find relations having foreign keys referencing any of the given rels
3501 *
3502 * Input and result are both lists of relation OIDs. The result contains
3503 * no duplicates, does *not* include any rels that were already in the input
3504 * list, and is sorted in OID order. (The last property is enforced mainly
3505 * to guarantee consistent behavior in the regression tests; we don't want
3506 * behavior to change depending on chance locations of rows in pg_constraint.)
3507 *
3508 * Note: caller should already have appropriate lock on all rels mentioned
3509 * in relationIds. Since adding or dropping an FK requires exclusive lock
3510 * on both rels, this ensures that the answer will be stable.
3511 */
3512 List *
heap_truncate_find_FKs(List * relationIds)3513 heap_truncate_find_FKs(List *relationIds)
3514 {
3515 List *result = NIL;
3516 List *oids;
3517 List *parent_cons;
3518 ListCell *cell;
3519 ScanKeyData key;
3520 Relation fkeyRel;
3521 SysScanDesc fkeyScan;
3522 HeapTuple tuple;
3523 bool restart;
3524
3525 oids = list_copy(relationIds);
3526
3527 /*
3528 * Must scan pg_constraint. Right now, it is a seqscan because there is
3529 * no available index on confrelid.
3530 */
3531 fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
3532
3533 restart:
3534 restart = false;
3535 parent_cons = NIL;
3536
3537 fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
3538 NULL, 0, NULL);
3539
3540 while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
3541 {
3542 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3543
3544 /* Not a foreign key */
3545 if (con->contype != CONSTRAINT_FOREIGN)
3546 continue;
3547
3548 /* Not referencing one of our list of tables */
3549 if (!list_member_oid(oids, con->confrelid))
3550 continue;
3551
3552 /*
3553 * If this constraint has a parent constraint which we have not seen
3554 * yet, keep track of it for the second loop, below. Tracking parent
3555 * constraints allows us to climb up to the top-level level constraint
3556 * and look for all possible relations referencing the partitioned
3557 * table.
3558 */
3559 if (OidIsValid(con->conparentid) &&
3560 !list_member_oid(parent_cons, con->conparentid))
3561 parent_cons = lappend_oid(parent_cons, con->conparentid);
3562
3563 /*
3564 * Add referencer to result, unless present in input list. (Don't
3565 * worry about dupes: we'll fix that below).
3566 */
3567 if (!list_member_oid(relationIds, con->conrelid))
3568 result = lappend_oid(result, con->conrelid);
3569 }
3570
3571 systable_endscan(fkeyScan);
3572
3573 /*
3574 * Process each parent constraint we found to add the list of referenced
3575 * relations by them to the oids list. If we do add any new such
3576 * relations, redo the first loop above. Also, if we see that the parent
3577 * constraint in turn has a parent, add that so that we process all
3578 * relations in a single additional pass.
3579 */
3580 foreach(cell, parent_cons)
3581 {
3582 Oid parent = lfirst_oid(cell);
3583
3584 ScanKeyInit(&key,
3585 Anum_pg_constraint_oid,
3586 BTEqualStrategyNumber, F_OIDEQ,
3587 ObjectIdGetDatum(parent));
3588
3589 fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
3590 true, NULL, 1, &key);
3591
3592 tuple = systable_getnext(fkeyScan);
3593 if (HeapTupleIsValid(tuple))
3594 {
3595 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3596
3597 /*
3598 * pg_constraint rows always appear for partitioned hierarchies
3599 * this way: on the each side of the constraint, one row appears
3600 * for each partition that points to the top-most table on the
3601 * other side.
3602 *
3603 * Because of this arrangement, we can correctly catch all
3604 * relevant relations by adding to 'parent_cons' all rows with
3605 * valid conparentid, and to the 'oids' list all rows with a zero
3606 * conparentid. If any oids are added to 'oids', redo the first
3607 * loop above by setting 'restart'.
3608 */
3609 if (OidIsValid(con->conparentid))
3610 parent_cons = list_append_unique_oid(parent_cons,
3611 con->conparentid);
3612 else if (!list_member_oid(oids, con->confrelid))
3613 {
3614 oids = lappend_oid(oids, con->confrelid);
3615 restart = true;
3616 }
3617 }
3618
3619 systable_endscan(fkeyScan);
3620 }
3621
3622 list_free(parent_cons);
3623 if (restart)
3624 goto restart;
3625
3626 table_close(fkeyRel, AccessShareLock);
3627 list_free(oids);
3628
3629 /* Now sort and de-duplicate the result list */
3630 list_sort(result, list_oid_cmp);
3631 list_deduplicate_oid(result);
3632
3633 return result;
3634 }
3635
3636 /*
3637 * StorePartitionKey
3638 * Store information about the partition key rel into the catalog
3639 */
3640 void
StorePartitionKey(Relation rel,char strategy,int16 partnatts,AttrNumber * partattrs,List * partexprs,Oid * partopclass,Oid * partcollation)3641 StorePartitionKey(Relation rel,
3642 char strategy,
3643 int16 partnatts,
3644 AttrNumber *partattrs,
3645 List *partexprs,
3646 Oid *partopclass,
3647 Oid *partcollation)
3648 {
3649 int i;
3650 int2vector *partattrs_vec;
3651 oidvector *partopclass_vec;
3652 oidvector *partcollation_vec;
3653 Datum partexprDatum;
3654 Relation pg_partitioned_table;
3655 HeapTuple tuple;
3656 Datum values[Natts_pg_partitioned_table];
3657 bool nulls[Natts_pg_partitioned_table];
3658 ObjectAddress myself;
3659 ObjectAddress referenced;
3660 ObjectAddresses *addrs;
3661
3662 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
3663
3664 /* Copy the partition attribute numbers, opclass OIDs into arrays */
3665 partattrs_vec = buildint2vector(partattrs, partnatts);
3666 partopclass_vec = buildoidvector(partopclass, partnatts);
3667 partcollation_vec = buildoidvector(partcollation, partnatts);
3668
3669 /* Convert the expressions (if any) to a text datum */
3670 if (partexprs)
3671 {
3672 char *exprString;
3673
3674 exprString = nodeToString(partexprs);
3675 partexprDatum = CStringGetTextDatum(exprString);
3676 pfree(exprString);
3677 }
3678 else
3679 partexprDatum = (Datum) 0;
3680
3681 pg_partitioned_table = table_open(PartitionedRelationId, RowExclusiveLock);
3682
3683 MemSet(nulls, false, sizeof(nulls));
3684
3685 /* Only this can ever be NULL */
3686 if (!partexprDatum)
3687 nulls[Anum_pg_partitioned_table_partexprs - 1] = true;
3688
3689 values[Anum_pg_partitioned_table_partrelid - 1] = ObjectIdGetDatum(RelationGetRelid(rel));
3690 values[Anum_pg_partitioned_table_partstrat - 1] = CharGetDatum(strategy);
3691 values[Anum_pg_partitioned_table_partnatts - 1] = Int16GetDatum(partnatts);
3692 values[Anum_pg_partitioned_table_partdefid - 1] = ObjectIdGetDatum(InvalidOid);
3693 values[Anum_pg_partitioned_table_partattrs - 1] = PointerGetDatum(partattrs_vec);
3694 values[Anum_pg_partitioned_table_partclass - 1] = PointerGetDatum(partopclass_vec);
3695 values[Anum_pg_partitioned_table_partcollation - 1] = PointerGetDatum(partcollation_vec);
3696 values[Anum_pg_partitioned_table_partexprs - 1] = partexprDatum;
3697
3698 tuple = heap_form_tuple(RelationGetDescr(pg_partitioned_table), values, nulls);
3699
3700 CatalogTupleInsert(pg_partitioned_table, tuple);
3701 table_close(pg_partitioned_table, RowExclusiveLock);
3702
3703 /* Mark this relation as dependent on a few things as follows */
3704 addrs = new_object_addresses();
3705 ObjectAddressSet(myself, RelationRelationId, RelationGetRelid(rel));
3706
3707 /* Operator class and collation per key column */
3708 for (i = 0; i < partnatts; i++)
3709 {
3710 ObjectAddressSet(referenced, OperatorClassRelationId, partopclass[i]);
3711 add_exact_object_address(&referenced, addrs);
3712
3713 /* The default collation is pinned, so don't bother recording it */
3714 if (OidIsValid(partcollation[i]) &&
3715 partcollation[i] != DEFAULT_COLLATION_OID)
3716 {
3717 ObjectAddressSet(referenced, CollationRelationId, partcollation[i]);
3718 add_exact_object_address(&referenced, addrs);
3719 }
3720 }
3721
3722 record_object_address_dependencies(&myself, addrs, DEPENDENCY_NORMAL);
3723 free_object_addresses(addrs);
3724
3725 /*
3726 * The partitioning columns are made internally dependent on the table,
3727 * because we cannot drop any of them without dropping the whole table.
3728 * (ATExecDropColumn independently enforces that, but it's not bulletproof
3729 * so we need the dependencies too.)
3730 */
3731 for (i = 0; i < partnatts; i++)
3732 {
3733 if (partattrs[i] == 0)
3734 continue; /* ignore expressions here */
3735
3736 ObjectAddressSubSet(referenced, RelationRelationId,
3737 RelationGetRelid(rel), partattrs[i]);
3738 recordDependencyOn(&referenced, &myself, DEPENDENCY_INTERNAL);
3739 }
3740
3741 /*
3742 * Also consider anything mentioned in partition expressions. External
3743 * references (e.g. functions) get NORMAL dependencies. Table columns
3744 * mentioned in the expressions are handled the same as plain partitioning
3745 * columns, i.e. they become internally dependent on the whole table.
3746 */
3747 if (partexprs)
3748 recordDependencyOnSingleRelExpr(&myself,
3749 (Node *) partexprs,
3750 RelationGetRelid(rel),
3751 DEPENDENCY_NORMAL,
3752 DEPENDENCY_INTERNAL,
3753 true /* reverse the self-deps */ );
3754
3755 /*
3756 * We must invalidate the relcache so that the next
3757 * CommandCounterIncrement() will cause the same to be rebuilt using the
3758 * information in just created catalog entry.
3759 */
3760 CacheInvalidateRelcache(rel);
3761 }
3762
3763 /*
3764 * RemovePartitionKeyByRelId
3765 * Remove pg_partitioned_table entry for a relation
3766 */
3767 void
RemovePartitionKeyByRelId(Oid relid)3768 RemovePartitionKeyByRelId(Oid relid)
3769 {
3770 Relation rel;
3771 HeapTuple tuple;
3772
3773 rel = table_open(PartitionedRelationId, RowExclusiveLock);
3774
3775 tuple = SearchSysCache1(PARTRELID, ObjectIdGetDatum(relid));
3776 if (!HeapTupleIsValid(tuple))
3777 elog(ERROR, "cache lookup failed for partition key of relation %u",
3778 relid);
3779
3780 CatalogTupleDelete(rel, &tuple->t_self);
3781
3782 ReleaseSysCache(tuple);
3783 table_close(rel, RowExclusiveLock);
3784 }
3785
3786 /*
3787 * StorePartitionBound
3788 * Update pg_class tuple of rel to store the partition bound and set
3789 * relispartition to true
3790 *
3791 * If this is the default partition, also update the default partition OID in
3792 * pg_partitioned_table.
3793 *
3794 * Also, invalidate the parent's relcache, so that the next rebuild will load
3795 * the new partition's info into its partition descriptor. If there is a
3796 * default partition, we must invalidate its relcache entry as well.
3797 */
3798 void
StorePartitionBound(Relation rel,Relation parent,PartitionBoundSpec * bound)3799 StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
3800 {
3801 Relation classRel;
3802 HeapTuple tuple,
3803 newtuple;
3804 Datum new_val[Natts_pg_class];
3805 bool new_null[Natts_pg_class],
3806 new_repl[Natts_pg_class];
3807 Oid defaultPartOid;
3808
3809 /* Update pg_class tuple */
3810 classRel = table_open(RelationRelationId, RowExclusiveLock);
3811 tuple = SearchSysCacheCopy1(RELOID,
3812 ObjectIdGetDatum(RelationGetRelid(rel)));
3813 if (!HeapTupleIsValid(tuple))
3814 elog(ERROR, "cache lookup failed for relation %u",
3815 RelationGetRelid(rel));
3816
3817 #ifdef USE_ASSERT_CHECKING
3818 {
3819 Form_pg_class classForm;
3820 bool isnull;
3821
3822 classForm = (Form_pg_class) GETSTRUCT(tuple);
3823 Assert(!classForm->relispartition);
3824 (void) SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relpartbound,
3825 &isnull);
3826 Assert(isnull);
3827 }
3828 #endif
3829
3830 /* Fill in relpartbound value */
3831 memset(new_val, 0, sizeof(new_val));
3832 memset(new_null, false, sizeof(new_null));
3833 memset(new_repl, false, sizeof(new_repl));
3834 new_val[Anum_pg_class_relpartbound - 1] = CStringGetTextDatum(nodeToString(bound));
3835 new_null[Anum_pg_class_relpartbound - 1] = false;
3836 new_repl[Anum_pg_class_relpartbound - 1] = true;
3837 newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
3838 new_val, new_null, new_repl);
3839 /* Also set the flag */
3840 ((Form_pg_class) GETSTRUCT(newtuple))->relispartition = true;
3841 CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
3842 heap_freetuple(newtuple);
3843 table_close(classRel, RowExclusiveLock);
3844
3845 /*
3846 * If we're storing bounds for the default partition, update
3847 * pg_partitioned_table too.
3848 */
3849 if (bound->is_default)
3850 update_default_partition_oid(RelationGetRelid(parent),
3851 RelationGetRelid(rel));
3852
3853 /* Make these updates visible */
3854 CommandCounterIncrement();
3855
3856 /*
3857 * The partition constraint for the default partition depends on the
3858 * partition bounds of every other partition, so we must invalidate the
3859 * relcache entry for that partition every time a partition is added or
3860 * removed.
3861 */
3862 defaultPartOid =
3863 get_default_oid_from_partdesc(RelationGetPartitionDesc(parent, true));
3864 if (OidIsValid(defaultPartOid))
3865 CacheInvalidateRelcacheByRelid(defaultPartOid);
3866
3867 CacheInvalidateRelcache(parent);
3868 }
3869