1 /*-------------------------------------------------------------------------
2 *
3 * pg_constraint.c
4 * routines to support manipulation of the pg_constraint relation
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/catalog/pg_constraint.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/sysattr.h"
20 #include "access/table.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/objectaccess.h"
26 #include "catalog/pg_constraint.h"
27 #include "catalog/pg_operator.h"
28 #include "catalog/pg_type.h"
29 #include "commands/defrem.h"
30 #include "commands/tablecmds.h"
31 #include "utils/array.h"
32 #include "utils/builtins.h"
33 #include "utils/fmgroids.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37
38
39 /*
40 * CreateConstraintEntry
41 * Create a constraint table entry.
42 *
43 * Subsidiary records (such as triggers or indexes to implement the
44 * constraint) are *not* created here. But we do make dependency links
45 * from the constraint to the things it depends on.
46 *
47 * The new constraint's OID is returned.
48 */
49 Oid
CreateConstraintEntry(const char * constraintName,Oid constraintNamespace,char constraintType,bool isDeferrable,bool isDeferred,bool isValidated,Oid parentConstrId,Oid relId,const int16 * constraintKey,int constraintNKeys,int constraintNTotalKeys,Oid domainId,Oid indexRelId,Oid foreignRelId,const int16 * foreignKey,const Oid * pfEqOp,const Oid * ppEqOp,const Oid * ffEqOp,int foreignNKeys,char foreignUpdateType,char foreignDeleteType,char foreignMatchType,const Oid * exclOp,Node * conExpr,const char * conBin,bool conIsLocal,int conInhCount,bool conNoInherit,bool is_internal)50 CreateConstraintEntry(const char *constraintName,
51 Oid constraintNamespace,
52 char constraintType,
53 bool isDeferrable,
54 bool isDeferred,
55 bool isValidated,
56 Oid parentConstrId,
57 Oid relId,
58 const int16 *constraintKey,
59 int constraintNKeys,
60 int constraintNTotalKeys,
61 Oid domainId,
62 Oid indexRelId,
63 Oid foreignRelId,
64 const int16 *foreignKey,
65 const Oid *pfEqOp,
66 const Oid *ppEqOp,
67 const Oid *ffEqOp,
68 int foreignNKeys,
69 char foreignUpdateType,
70 char foreignDeleteType,
71 char foreignMatchType,
72 const Oid *exclOp,
73 Node *conExpr,
74 const char *conBin,
75 bool conIsLocal,
76 int conInhCount,
77 bool conNoInherit,
78 bool is_internal)
79 {
80 Relation conDesc;
81 Oid conOid;
82 HeapTuple tup;
83 bool nulls[Natts_pg_constraint];
84 Datum values[Natts_pg_constraint];
85 ArrayType *conkeyArray;
86 ArrayType *confkeyArray;
87 ArrayType *conpfeqopArray;
88 ArrayType *conppeqopArray;
89 ArrayType *conffeqopArray;
90 ArrayType *conexclopArray;
91 NameData cname;
92 int i;
93 ObjectAddress conobject;
94
95 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
96
97 Assert(constraintName);
98 namestrcpy(&cname, constraintName);
99
100 /*
101 * Convert C arrays into Postgres arrays.
102 */
103 if (constraintNKeys > 0)
104 {
105 Datum *conkey;
106
107 conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
108 for (i = 0; i < constraintNKeys; i++)
109 conkey[i] = Int16GetDatum(constraintKey[i]);
110 conkeyArray = construct_array(conkey, constraintNKeys,
111 INT2OID, 2, true, TYPALIGN_SHORT);
112 }
113 else
114 conkeyArray = NULL;
115
116 if (foreignNKeys > 0)
117 {
118 Datum *fkdatums;
119
120 fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
121 for (i = 0; i < foreignNKeys; i++)
122 fkdatums[i] = Int16GetDatum(foreignKey[i]);
123 confkeyArray = construct_array(fkdatums, foreignNKeys,
124 INT2OID, 2, true, TYPALIGN_SHORT);
125 for (i = 0; i < foreignNKeys; i++)
126 fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
127 conpfeqopArray = construct_array(fkdatums, foreignNKeys,
128 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
129 for (i = 0; i < foreignNKeys; i++)
130 fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
131 conppeqopArray = construct_array(fkdatums, foreignNKeys,
132 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
133 for (i = 0; i < foreignNKeys; i++)
134 fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
135 conffeqopArray = construct_array(fkdatums, foreignNKeys,
136 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
137 }
138 else
139 {
140 confkeyArray = NULL;
141 conpfeqopArray = NULL;
142 conppeqopArray = NULL;
143 conffeqopArray = NULL;
144 }
145
146 if (exclOp != NULL)
147 {
148 Datum *opdatums;
149
150 opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
151 for (i = 0; i < constraintNKeys; i++)
152 opdatums[i] = ObjectIdGetDatum(exclOp[i]);
153 conexclopArray = construct_array(opdatums, constraintNKeys,
154 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
155 }
156 else
157 conexclopArray = NULL;
158
159 /* initialize nulls and values */
160 for (i = 0; i < Natts_pg_constraint; i++)
161 {
162 nulls[i] = false;
163 values[i] = (Datum) NULL;
164 }
165
166 conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
167 Anum_pg_constraint_oid);
168 values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
169 values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
170 values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
171 values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
172 values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
173 values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
174 values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
175 values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
176 values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
177 values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
178 values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
179 values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
180 values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
181 values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
182 values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
183 values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
184 values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
185 values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
186
187 if (conkeyArray)
188 values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
189 else
190 nulls[Anum_pg_constraint_conkey - 1] = true;
191
192 if (confkeyArray)
193 values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
194 else
195 nulls[Anum_pg_constraint_confkey - 1] = true;
196
197 if (conpfeqopArray)
198 values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
199 else
200 nulls[Anum_pg_constraint_conpfeqop - 1] = true;
201
202 if (conppeqopArray)
203 values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
204 else
205 nulls[Anum_pg_constraint_conppeqop - 1] = true;
206
207 if (conffeqopArray)
208 values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
209 else
210 nulls[Anum_pg_constraint_conffeqop - 1] = true;
211
212 if (conexclopArray)
213 values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
214 else
215 nulls[Anum_pg_constraint_conexclop - 1] = true;
216
217 if (conBin)
218 values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
219 else
220 nulls[Anum_pg_constraint_conbin - 1] = true;
221
222 tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
223
224 CatalogTupleInsert(conDesc, tup);
225
226 conobject.classId = ConstraintRelationId;
227 conobject.objectId = conOid;
228 conobject.objectSubId = 0;
229
230 table_close(conDesc, RowExclusiveLock);
231
232 if (OidIsValid(relId))
233 {
234 /*
235 * Register auto dependency from constraint to owning relation, or to
236 * specific column(s) if any are mentioned.
237 */
238 ObjectAddress relobject;
239
240 relobject.classId = RelationRelationId;
241 relobject.objectId = relId;
242 if (constraintNTotalKeys > 0)
243 {
244 for (i = 0; i < constraintNTotalKeys; i++)
245 {
246 relobject.objectSubId = constraintKey[i];
247
248 recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
249 }
250 }
251 else
252 {
253 relobject.objectSubId = 0;
254
255 recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
256 }
257 }
258
259 if (OidIsValid(domainId))
260 {
261 /*
262 * Register auto dependency from constraint to owning domain
263 */
264 ObjectAddress domobject;
265
266 domobject.classId = TypeRelationId;
267 domobject.objectId = domainId;
268 domobject.objectSubId = 0;
269
270 recordDependencyOn(&conobject, &domobject, DEPENDENCY_AUTO);
271 }
272
273 if (OidIsValid(foreignRelId))
274 {
275 /*
276 * Register normal dependency from constraint to foreign relation, or
277 * to specific column(s) if any are mentioned.
278 */
279 ObjectAddress relobject;
280
281 relobject.classId = RelationRelationId;
282 relobject.objectId = foreignRelId;
283 if (foreignNKeys > 0)
284 {
285 for (i = 0; i < foreignNKeys; i++)
286 {
287 relobject.objectSubId = foreignKey[i];
288
289 recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
290 }
291 }
292 else
293 {
294 relobject.objectSubId = 0;
295
296 recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
297 }
298 }
299
300 if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
301 {
302 /*
303 * Register normal dependency on the unique index that supports a
304 * foreign-key constraint. (Note: for indexes associated with unique
305 * or primary-key constraints, the dependency runs the other way, and
306 * is not made here.)
307 */
308 ObjectAddress relobject;
309
310 relobject.classId = RelationRelationId;
311 relobject.objectId = indexRelId;
312 relobject.objectSubId = 0;
313
314 recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
315 }
316
317 if (foreignNKeys > 0)
318 {
319 /*
320 * Register normal dependencies on the equality operators that support
321 * a foreign-key constraint. If the PK and FK types are the same then
322 * all three operators for a column are the same; otherwise they are
323 * different.
324 */
325 ObjectAddress oprobject;
326
327 oprobject.classId = OperatorRelationId;
328 oprobject.objectSubId = 0;
329
330 for (i = 0; i < foreignNKeys; i++)
331 {
332 oprobject.objectId = pfEqOp[i];
333 recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
334 if (ppEqOp[i] != pfEqOp[i])
335 {
336 oprobject.objectId = ppEqOp[i];
337 recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
338 }
339 if (ffEqOp[i] != pfEqOp[i])
340 {
341 oprobject.objectId = ffEqOp[i];
342 recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
343 }
344 }
345 }
346
347 /*
348 * We don't bother to register dependencies on the exclusion operators of
349 * an exclusion constraint. We assume they are members of the opclass
350 * supporting the index, so there's an indirect dependency via that. (This
351 * would be pretty dicey for cross-type operators, but exclusion operators
352 * can never be cross-type.)
353 */
354
355 if (conExpr != NULL)
356 {
357 /*
358 * Register dependencies from constraint to objects mentioned in CHECK
359 * expression.
360 */
361 recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
362 DEPENDENCY_NORMAL,
363 DEPENDENCY_NORMAL, false);
364 }
365
366 /* Post creation hook for new constraint */
367 InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
368 is_internal);
369
370 return conOid;
371 }
372
373 /*
374 * Test whether given name is currently used as a constraint name
375 * for the given object (relation or domain).
376 *
377 * This is used to decide whether to accept a user-specified constraint name.
378 * It is deliberately not the same test as ChooseConstraintName uses to decide
379 * whether an auto-generated name is OK: here, we will allow it unless there
380 * is an identical constraint name in use *on the same object*.
381 *
382 * NB: Caller should hold exclusive lock on the given object, else
383 * this test can be fooled by concurrent additions.
384 */
385 bool
ConstraintNameIsUsed(ConstraintCategory conCat,Oid objId,const char * conname)386 ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
387 const char *conname)
388 {
389 bool found;
390 Relation conDesc;
391 SysScanDesc conscan;
392 ScanKeyData skey[3];
393
394 conDesc = table_open(ConstraintRelationId, AccessShareLock);
395
396 ScanKeyInit(&skey[0],
397 Anum_pg_constraint_conrelid,
398 BTEqualStrategyNumber, F_OIDEQ,
399 ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
400 ? objId : InvalidOid));
401 ScanKeyInit(&skey[1],
402 Anum_pg_constraint_contypid,
403 BTEqualStrategyNumber, F_OIDEQ,
404 ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
405 ? objId : InvalidOid));
406 ScanKeyInit(&skey[2],
407 Anum_pg_constraint_conname,
408 BTEqualStrategyNumber, F_NAMEEQ,
409 CStringGetDatum(conname));
410
411 conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
412 true, NULL, 3, skey);
413
414 /* There can be at most one matching row */
415 found = (HeapTupleIsValid(systable_getnext(conscan)));
416
417 systable_endscan(conscan);
418 table_close(conDesc, AccessShareLock);
419
420 return found;
421 }
422
423 /*
424 * Does any constraint of the given name exist in the given namespace?
425 *
426 * This is used for code that wants to match ChooseConstraintName's rule
427 * that we should avoid autogenerating duplicate constraint names within a
428 * namespace.
429 */
430 bool
ConstraintNameExists(const char * conname,Oid namespaceid)431 ConstraintNameExists(const char *conname, Oid namespaceid)
432 {
433 bool found;
434 Relation conDesc;
435 SysScanDesc conscan;
436 ScanKeyData skey[2];
437
438 conDesc = table_open(ConstraintRelationId, AccessShareLock);
439
440 ScanKeyInit(&skey[0],
441 Anum_pg_constraint_conname,
442 BTEqualStrategyNumber, F_NAMEEQ,
443 CStringGetDatum(conname));
444
445 ScanKeyInit(&skey[1],
446 Anum_pg_constraint_connamespace,
447 BTEqualStrategyNumber, F_OIDEQ,
448 ObjectIdGetDatum(namespaceid));
449
450 conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
451 NULL, 2, skey);
452
453 found = (HeapTupleIsValid(systable_getnext(conscan)));
454
455 systable_endscan(conscan);
456 table_close(conDesc, AccessShareLock);
457
458 return found;
459 }
460
461 /*
462 * Select a nonconflicting name for a new constraint.
463 *
464 * The objective here is to choose a name that is unique within the
465 * specified namespace. Postgres does not require this, but the SQL
466 * spec does, and some apps depend on it. Therefore we avoid choosing
467 * default names that so conflict.
468 *
469 * name1, name2, and label are used the same way as for makeObjectName(),
470 * except that the label can't be NULL; digits will be appended to the label
471 * if needed to create a name that is unique within the specified namespace.
472 *
473 * 'others' can be a list of string names already chosen within the current
474 * command (but not yet reflected into the catalogs); we will not choose
475 * a duplicate of one of these either.
476 *
477 * Note: it is theoretically possible to get a collision anyway, if someone
478 * else chooses the same name concurrently. This is fairly unlikely to be
479 * a problem in practice, especially if one is holding an exclusive lock on
480 * the relation identified by name1.
481 *
482 * Returns a palloc'd string.
483 */
484 char *
ChooseConstraintName(const char * name1,const char * name2,const char * label,Oid namespaceid,List * others)485 ChooseConstraintName(const char *name1, const char *name2,
486 const char *label, Oid namespaceid,
487 List *others)
488 {
489 int pass = 0;
490 char *conname = NULL;
491 char modlabel[NAMEDATALEN];
492 Relation conDesc;
493 SysScanDesc conscan;
494 ScanKeyData skey[2];
495 bool found;
496 ListCell *l;
497
498 conDesc = table_open(ConstraintRelationId, AccessShareLock);
499
500 /* try the unmodified label first */
501 StrNCpy(modlabel, label, sizeof(modlabel));
502
503 for (;;)
504 {
505 conname = makeObjectName(name1, name2, modlabel);
506
507 found = false;
508
509 foreach(l, others)
510 {
511 if (strcmp((char *) lfirst(l), conname) == 0)
512 {
513 found = true;
514 break;
515 }
516 }
517
518 if (!found)
519 {
520 ScanKeyInit(&skey[0],
521 Anum_pg_constraint_conname,
522 BTEqualStrategyNumber, F_NAMEEQ,
523 CStringGetDatum(conname));
524
525 ScanKeyInit(&skey[1],
526 Anum_pg_constraint_connamespace,
527 BTEqualStrategyNumber, F_OIDEQ,
528 ObjectIdGetDatum(namespaceid));
529
530 conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
531 NULL, 2, skey);
532
533 found = (HeapTupleIsValid(systable_getnext(conscan)));
534
535 systable_endscan(conscan);
536 }
537
538 if (!found)
539 break;
540
541 /* found a conflict, so try a new name component */
542 pfree(conname);
543 snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
544 }
545
546 table_close(conDesc, AccessShareLock);
547
548 return conname;
549 }
550
551 /*
552 * Delete a single constraint record.
553 */
554 void
RemoveConstraintById(Oid conId)555 RemoveConstraintById(Oid conId)
556 {
557 Relation conDesc;
558 HeapTuple tup;
559 Form_pg_constraint con;
560
561 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
562
563 tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
564 if (!HeapTupleIsValid(tup)) /* should not happen */
565 elog(ERROR, "cache lookup failed for constraint %u", conId);
566 con = (Form_pg_constraint) GETSTRUCT(tup);
567
568 /*
569 * Special processing depending on what the constraint is for.
570 */
571 if (OidIsValid(con->conrelid))
572 {
573 Relation rel;
574
575 /*
576 * If the constraint is for a relation, open and exclusive-lock the
577 * relation it's for.
578 */
579 rel = table_open(con->conrelid, AccessExclusiveLock);
580
581 /*
582 * We need to update the relchecks count if it is a check constraint
583 * being dropped. This update will force backends to rebuild relcache
584 * entries when we commit.
585 */
586 if (con->contype == CONSTRAINT_CHECK)
587 {
588 Relation pgrel;
589 HeapTuple relTup;
590 Form_pg_class classForm;
591
592 pgrel = table_open(RelationRelationId, RowExclusiveLock);
593 relTup = SearchSysCacheCopy1(RELOID,
594 ObjectIdGetDatum(con->conrelid));
595 if (!HeapTupleIsValid(relTup))
596 elog(ERROR, "cache lookup failed for relation %u",
597 con->conrelid);
598 classForm = (Form_pg_class) GETSTRUCT(relTup);
599
600 if (classForm->relchecks == 0) /* should not happen */
601 elog(ERROR, "relation \"%s\" has relchecks = 0",
602 RelationGetRelationName(rel));
603 classForm->relchecks--;
604
605 CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
606
607 heap_freetuple(relTup);
608
609 table_close(pgrel, RowExclusiveLock);
610 }
611
612 /* Keep lock on constraint's rel until end of xact */
613 table_close(rel, NoLock);
614 }
615 else if (OidIsValid(con->contypid))
616 {
617 /*
618 * XXX for now, do nothing special when dropping a domain constraint
619 *
620 * Probably there should be some form of locking on the domain type,
621 * but we have no such concept at the moment.
622 */
623 }
624 else
625 elog(ERROR, "constraint %u is not of a known type", conId);
626
627 /* Fry the constraint itself */
628 CatalogTupleDelete(conDesc, &tup->t_self);
629
630 /* Clean up */
631 ReleaseSysCache(tup);
632 table_close(conDesc, RowExclusiveLock);
633 }
634
635 /*
636 * RenameConstraintById
637 * Rename a constraint.
638 *
639 * Note: this isn't intended to be a user-exposed function; it doesn't check
640 * permissions etc. Currently this is only invoked when renaming an index
641 * that is associated with a constraint, but it's made a little more general
642 * than that with the expectation of someday having ALTER TABLE RENAME
643 * CONSTRAINT.
644 */
645 void
RenameConstraintById(Oid conId,const char * newname)646 RenameConstraintById(Oid conId, const char *newname)
647 {
648 Relation conDesc;
649 HeapTuple tuple;
650 Form_pg_constraint con;
651
652 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
653
654 tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
655 if (!HeapTupleIsValid(tuple))
656 elog(ERROR, "cache lookup failed for constraint %u", conId);
657 con = (Form_pg_constraint) GETSTRUCT(tuple);
658
659 /*
660 * For user-friendliness, check whether the name is already in use.
661 */
662 if (OidIsValid(con->conrelid) &&
663 ConstraintNameIsUsed(CONSTRAINT_RELATION,
664 con->conrelid,
665 newname))
666 ereport(ERROR,
667 (errcode(ERRCODE_DUPLICATE_OBJECT),
668 errmsg("constraint \"%s\" for relation \"%s\" already exists",
669 newname, get_rel_name(con->conrelid))));
670 if (OidIsValid(con->contypid) &&
671 ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
672 con->contypid,
673 newname))
674 ereport(ERROR,
675 (errcode(ERRCODE_DUPLICATE_OBJECT),
676 errmsg("constraint \"%s\" for domain %s already exists",
677 newname, format_type_be(con->contypid))));
678
679 /* OK, do the rename --- tuple is a copy, so OK to scribble on it */
680 namestrcpy(&(con->conname), newname);
681
682 CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
683
684 InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
685
686 heap_freetuple(tuple);
687 table_close(conDesc, RowExclusiveLock);
688 }
689
690 /*
691 * AlterConstraintNamespaces
692 * Find any constraints belonging to the specified object,
693 * and move them to the specified new namespace.
694 *
695 * isType indicates whether the owning object is a type or a relation.
696 */
697 void
AlterConstraintNamespaces(Oid ownerId,Oid oldNspId,Oid newNspId,bool isType,ObjectAddresses * objsMoved)698 AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
699 Oid newNspId, bool isType, ObjectAddresses *objsMoved)
700 {
701 Relation conRel;
702 ScanKeyData key[2];
703 SysScanDesc scan;
704 HeapTuple tup;
705
706 conRel = table_open(ConstraintRelationId, RowExclusiveLock);
707
708 ScanKeyInit(&key[0],
709 Anum_pg_constraint_conrelid,
710 BTEqualStrategyNumber, F_OIDEQ,
711 ObjectIdGetDatum(isType ? InvalidOid : ownerId));
712 ScanKeyInit(&key[1],
713 Anum_pg_constraint_contypid,
714 BTEqualStrategyNumber, F_OIDEQ,
715 ObjectIdGetDatum(isType ? ownerId : InvalidOid));
716
717 scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
718 NULL, 2, key);
719
720 while (HeapTupleIsValid((tup = systable_getnext(scan))))
721 {
722 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
723 ObjectAddress thisobj;
724
725 thisobj.classId = ConstraintRelationId;
726 thisobj.objectId = conform->oid;
727 thisobj.objectSubId = 0;
728
729 if (object_address_present(&thisobj, objsMoved))
730 continue;
731
732 /* Don't update if the object is already part of the namespace */
733 if (conform->connamespace == oldNspId && oldNspId != newNspId)
734 {
735 tup = heap_copytuple(tup);
736 conform = (Form_pg_constraint) GETSTRUCT(tup);
737
738 conform->connamespace = newNspId;
739
740 CatalogTupleUpdate(conRel, &tup->t_self, tup);
741
742 /*
743 * Note: currently, the constraint will not have its own
744 * dependency on the namespace, so we don't need to do
745 * changeDependencyFor().
746 */
747 }
748
749 InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
750
751 add_exact_object_address(&thisobj, objsMoved);
752 }
753
754 systable_endscan(scan);
755
756 table_close(conRel, RowExclusiveLock);
757 }
758
759 /*
760 * ConstraintSetParentConstraint
761 * Set a partition's constraint as child of its parent constraint,
762 * or remove the linkage if parentConstrId is InvalidOid.
763 *
764 * This updates the constraint's pg_constraint row to show it as inherited, and
765 * adds PARTITION dependencies to prevent the constraint from being deleted
766 * on its own. Alternatively, reverse that.
767 */
768 void
ConstraintSetParentConstraint(Oid childConstrId,Oid parentConstrId,Oid childTableId)769 ConstraintSetParentConstraint(Oid childConstrId,
770 Oid parentConstrId,
771 Oid childTableId)
772 {
773 Relation constrRel;
774 Form_pg_constraint constrForm;
775 HeapTuple tuple,
776 newtup;
777 ObjectAddress depender;
778 ObjectAddress referenced;
779
780 constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
781 tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
782 if (!HeapTupleIsValid(tuple))
783 elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
784 newtup = heap_copytuple(tuple);
785 constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
786 if (OidIsValid(parentConstrId))
787 {
788 /* don't allow setting parent for a constraint that already has one */
789 Assert(constrForm->coninhcount == 0);
790 if (constrForm->conparentid != InvalidOid)
791 elog(ERROR, "constraint %u already has a parent constraint",
792 childConstrId);
793
794 constrForm->conislocal = false;
795 constrForm->coninhcount++;
796 constrForm->conparentid = parentConstrId;
797
798 CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
799
800 ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
801
802 ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
803 recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
804
805 ObjectAddressSet(referenced, RelationRelationId, childTableId);
806 recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
807 }
808 else
809 {
810 constrForm->coninhcount--;
811 constrForm->conislocal = true;
812 constrForm->conparentid = InvalidOid;
813
814 /* Make sure there's no further inheritance. */
815 Assert(constrForm->coninhcount == 0);
816
817 CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
818
819 deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
820 ConstraintRelationId,
821 DEPENDENCY_PARTITION_PRI);
822 deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
823 RelationRelationId,
824 DEPENDENCY_PARTITION_SEC);
825 }
826
827 ReleaseSysCache(tuple);
828 table_close(constrRel, RowExclusiveLock);
829 }
830
831
832 /*
833 * get_relation_constraint_oid
834 * Find a constraint on the specified relation with the specified name.
835 * Returns constraint's OID.
836 */
837 Oid
get_relation_constraint_oid(Oid relid,const char * conname,bool missing_ok)838 get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
839 {
840 Relation pg_constraint;
841 HeapTuple tuple;
842 SysScanDesc scan;
843 ScanKeyData skey[3];
844 Oid conOid = InvalidOid;
845
846 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
847
848 ScanKeyInit(&skey[0],
849 Anum_pg_constraint_conrelid,
850 BTEqualStrategyNumber, F_OIDEQ,
851 ObjectIdGetDatum(relid));
852 ScanKeyInit(&skey[1],
853 Anum_pg_constraint_contypid,
854 BTEqualStrategyNumber, F_OIDEQ,
855 ObjectIdGetDatum(InvalidOid));
856 ScanKeyInit(&skey[2],
857 Anum_pg_constraint_conname,
858 BTEqualStrategyNumber, F_NAMEEQ,
859 CStringGetDatum(conname));
860
861 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
862 NULL, 3, skey);
863
864 /* There can be at most one matching row */
865 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
866 conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
867
868 systable_endscan(scan);
869
870 /* If no such constraint exists, complain */
871 if (!OidIsValid(conOid) && !missing_ok)
872 ereport(ERROR,
873 (errcode(ERRCODE_UNDEFINED_OBJECT),
874 errmsg("constraint \"%s\" for table \"%s\" does not exist",
875 conname, get_rel_name(relid))));
876
877 table_close(pg_constraint, AccessShareLock);
878
879 return conOid;
880 }
881
882 /*
883 * get_relation_constraint_attnos
884 * Find a constraint on the specified relation with the specified name
885 * and return the constrained columns.
886 *
887 * Returns a Bitmapset of the column attnos of the constrained columns, with
888 * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
889 * columns can be represented.
890 *
891 * *constraintOid is set to the OID of the constraint, or InvalidOid on
892 * failure.
893 */
894 Bitmapset *
get_relation_constraint_attnos(Oid relid,const char * conname,bool missing_ok,Oid * constraintOid)895 get_relation_constraint_attnos(Oid relid, const char *conname,
896 bool missing_ok, Oid *constraintOid)
897 {
898 Bitmapset *conattnos = NULL;
899 Relation pg_constraint;
900 HeapTuple tuple;
901 SysScanDesc scan;
902 ScanKeyData skey[3];
903
904 /* Set *constraintOid, to avoid complaints about uninitialized vars */
905 *constraintOid = InvalidOid;
906
907 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
908
909 ScanKeyInit(&skey[0],
910 Anum_pg_constraint_conrelid,
911 BTEqualStrategyNumber, F_OIDEQ,
912 ObjectIdGetDatum(relid));
913 ScanKeyInit(&skey[1],
914 Anum_pg_constraint_contypid,
915 BTEqualStrategyNumber, F_OIDEQ,
916 ObjectIdGetDatum(InvalidOid));
917 ScanKeyInit(&skey[2],
918 Anum_pg_constraint_conname,
919 BTEqualStrategyNumber, F_NAMEEQ,
920 CStringGetDatum(conname));
921
922 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
923 NULL, 3, skey);
924
925 /* There can be at most one matching row */
926 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
927 {
928 Datum adatum;
929 bool isNull;
930
931 *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
932
933 /* Extract the conkey array, ie, attnums of constrained columns */
934 adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
935 RelationGetDescr(pg_constraint), &isNull);
936 if (!isNull)
937 {
938 ArrayType *arr;
939 int numcols;
940 int16 *attnums;
941 int i;
942
943 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
944 numcols = ARR_DIMS(arr)[0];
945 if (ARR_NDIM(arr) != 1 ||
946 numcols < 0 ||
947 ARR_HASNULL(arr) ||
948 ARR_ELEMTYPE(arr) != INT2OID)
949 elog(ERROR, "conkey is not a 1-D smallint array");
950 attnums = (int16 *) ARR_DATA_PTR(arr);
951
952 /* Construct the result value */
953 for (i = 0; i < numcols; i++)
954 {
955 conattnos = bms_add_member(conattnos,
956 attnums[i] - FirstLowInvalidHeapAttributeNumber);
957 }
958 }
959 }
960
961 systable_endscan(scan);
962
963 /* If no such constraint exists, complain */
964 if (!OidIsValid(*constraintOid) && !missing_ok)
965 ereport(ERROR,
966 (errcode(ERRCODE_UNDEFINED_OBJECT),
967 errmsg("constraint \"%s\" for table \"%s\" does not exist",
968 conname, get_rel_name(relid))));
969
970 table_close(pg_constraint, AccessShareLock);
971
972 return conattnos;
973 }
974
975 /*
976 * Return the OID of the constraint associated with the given index in the
977 * given relation; or InvalidOid if no such index is catalogued.
978 */
979 Oid
get_relation_idx_constraint_oid(Oid relationId,Oid indexId)980 get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
981 {
982 Relation pg_constraint;
983 SysScanDesc scan;
984 ScanKeyData key;
985 HeapTuple tuple;
986 Oid constraintId = InvalidOid;
987
988 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
989
990 ScanKeyInit(&key,
991 Anum_pg_constraint_conrelid,
992 BTEqualStrategyNumber,
993 F_OIDEQ,
994 ObjectIdGetDatum(relationId));
995 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
996 true, NULL, 1, &key);
997 while ((tuple = systable_getnext(scan)) != NULL)
998 {
999 Form_pg_constraint constrForm;
1000
1001 constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
1002 if (constrForm->conindid == indexId)
1003 {
1004 constraintId = constrForm->oid;
1005 break;
1006 }
1007 }
1008 systable_endscan(scan);
1009
1010 table_close(pg_constraint, AccessShareLock);
1011 return constraintId;
1012 }
1013
1014 /*
1015 * get_domain_constraint_oid
1016 * Find a constraint on the specified domain with the specified name.
1017 * Returns constraint's OID.
1018 */
1019 Oid
get_domain_constraint_oid(Oid typid,const char * conname,bool missing_ok)1020 get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1021 {
1022 Relation pg_constraint;
1023 HeapTuple tuple;
1024 SysScanDesc scan;
1025 ScanKeyData skey[3];
1026 Oid conOid = InvalidOid;
1027
1028 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1029
1030 ScanKeyInit(&skey[0],
1031 Anum_pg_constraint_conrelid,
1032 BTEqualStrategyNumber, F_OIDEQ,
1033 ObjectIdGetDatum(InvalidOid));
1034 ScanKeyInit(&skey[1],
1035 Anum_pg_constraint_contypid,
1036 BTEqualStrategyNumber, F_OIDEQ,
1037 ObjectIdGetDatum(typid));
1038 ScanKeyInit(&skey[2],
1039 Anum_pg_constraint_conname,
1040 BTEqualStrategyNumber, F_NAMEEQ,
1041 CStringGetDatum(conname));
1042
1043 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1044 NULL, 3, skey);
1045
1046 /* There can be at most one matching row */
1047 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1048 conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1049
1050 systable_endscan(scan);
1051
1052 /* If no such constraint exists, complain */
1053 if (!OidIsValid(conOid) && !missing_ok)
1054 ereport(ERROR,
1055 (errcode(ERRCODE_UNDEFINED_OBJECT),
1056 errmsg("constraint \"%s\" for domain %s does not exist",
1057 conname, format_type_be(typid))));
1058
1059 table_close(pg_constraint, AccessShareLock);
1060
1061 return conOid;
1062 }
1063
1064 /*
1065 * get_primary_key_attnos
1066 * Identify the columns in a relation's primary key, if any.
1067 *
1068 * Returns a Bitmapset of the column attnos of the primary key's columns,
1069 * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1070 * system columns can be represented.
1071 *
1072 * If there is no primary key, return NULL. We also return NULL if the pkey
1073 * constraint is deferrable and deferrableOk is false.
1074 *
1075 * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1076 * on failure.
1077 */
1078 Bitmapset *
get_primary_key_attnos(Oid relid,bool deferrableOk,Oid * constraintOid)1079 get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1080 {
1081 Bitmapset *pkattnos = NULL;
1082 Relation pg_constraint;
1083 HeapTuple tuple;
1084 SysScanDesc scan;
1085 ScanKeyData skey[1];
1086
1087 /* Set *constraintOid, to avoid complaints about uninitialized vars */
1088 *constraintOid = InvalidOid;
1089
1090 /* Scan pg_constraint for constraints of the target rel */
1091 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1092
1093 ScanKeyInit(&skey[0],
1094 Anum_pg_constraint_conrelid,
1095 BTEqualStrategyNumber, F_OIDEQ,
1096 ObjectIdGetDatum(relid));
1097
1098 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1099 NULL, 1, skey);
1100
1101 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1102 {
1103 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1104 Datum adatum;
1105 bool isNull;
1106 ArrayType *arr;
1107 int16 *attnums;
1108 int numkeys;
1109 int i;
1110
1111 /* Skip constraints that are not PRIMARY KEYs */
1112 if (con->contype != CONSTRAINT_PRIMARY)
1113 continue;
1114
1115 /*
1116 * If the primary key is deferrable, but we've been instructed to
1117 * ignore deferrable constraints, then we might as well give up
1118 * searching, since there can only be a single primary key on a table.
1119 */
1120 if (con->condeferrable && !deferrableOk)
1121 break;
1122
1123 /* Extract the conkey array, ie, attnums of PK's columns */
1124 adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1125 RelationGetDescr(pg_constraint), &isNull);
1126 if (isNull)
1127 elog(ERROR, "null conkey for constraint %u",
1128 ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1129 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1130 numkeys = ARR_DIMS(arr)[0];
1131 if (ARR_NDIM(arr) != 1 ||
1132 numkeys < 0 ||
1133 ARR_HASNULL(arr) ||
1134 ARR_ELEMTYPE(arr) != INT2OID)
1135 elog(ERROR, "conkey is not a 1-D smallint array");
1136 attnums = (int16 *) ARR_DATA_PTR(arr);
1137
1138 /* Construct the result value */
1139 for (i = 0; i < numkeys; i++)
1140 {
1141 pkattnos = bms_add_member(pkattnos,
1142 attnums[i] - FirstLowInvalidHeapAttributeNumber);
1143 }
1144 *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1145
1146 /* No need to search further */
1147 break;
1148 }
1149
1150 systable_endscan(scan);
1151
1152 table_close(pg_constraint, AccessShareLock);
1153
1154 return pkattnos;
1155 }
1156
1157 /*
1158 * Extract data from the pg_constraint tuple of a foreign-key constraint.
1159 *
1160 * All arguments save the first are output arguments; the last three of them
1161 * can be passed as NULL if caller doesn't need them.
1162 */
1163 void
DeconstructFkConstraintRow(HeapTuple tuple,int * numfks,AttrNumber * conkey,AttrNumber * confkey,Oid * pf_eq_oprs,Oid * pp_eq_oprs,Oid * ff_eq_oprs)1164 DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1165 AttrNumber *conkey, AttrNumber *confkey,
1166 Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
1167 {
1168 Oid constrId;
1169 Datum adatum;
1170 bool isNull;
1171 ArrayType *arr;
1172 int numkeys;
1173
1174 constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1175
1176 /*
1177 * We expect the arrays to be 1-D arrays of the right types; verify that.
1178 * We don't need to use deconstruct_array() since the array data is just
1179 * going to look like a C array of values.
1180 */
1181 adatum = SysCacheGetAttr(CONSTROID, tuple,
1182 Anum_pg_constraint_conkey, &isNull);
1183 if (isNull)
1184 elog(ERROR, "null conkey for constraint %u", constrId);
1185 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1186 if (ARR_NDIM(arr) != 1 ||
1187 ARR_HASNULL(arr) ||
1188 ARR_ELEMTYPE(arr) != INT2OID)
1189 elog(ERROR, "conkey is not a 1-D smallint array");
1190 numkeys = ARR_DIMS(arr)[0];
1191 if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1192 elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1193 memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1194 if ((Pointer) arr != DatumGetPointer(adatum))
1195 pfree(arr); /* free de-toasted copy, if any */
1196
1197 adatum = SysCacheGetAttr(CONSTROID, tuple,
1198 Anum_pg_constraint_confkey, &isNull);
1199 if (isNull)
1200 elog(ERROR, "null confkey for constraint %u", constrId);
1201 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1202 if (ARR_NDIM(arr) != 1 ||
1203 ARR_DIMS(arr)[0] != numkeys ||
1204 ARR_HASNULL(arr) ||
1205 ARR_ELEMTYPE(arr) != INT2OID)
1206 elog(ERROR, "confkey is not a 1-D smallint array");
1207 memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1208 if ((Pointer) arr != DatumGetPointer(adatum))
1209 pfree(arr); /* free de-toasted copy, if any */
1210
1211 if (pf_eq_oprs)
1212 {
1213 adatum = SysCacheGetAttr(CONSTROID, tuple,
1214 Anum_pg_constraint_conpfeqop, &isNull);
1215 if (isNull)
1216 elog(ERROR, "null conpfeqop for constraint %u", constrId);
1217 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1218 /* see TryReuseForeignKey if you change the test below */
1219 if (ARR_NDIM(arr) != 1 ||
1220 ARR_DIMS(arr)[0] != numkeys ||
1221 ARR_HASNULL(arr) ||
1222 ARR_ELEMTYPE(arr) != OIDOID)
1223 elog(ERROR, "conpfeqop is not a 1-D Oid array");
1224 memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1225 if ((Pointer) arr != DatumGetPointer(adatum))
1226 pfree(arr); /* free de-toasted copy, if any */
1227 }
1228
1229 if (pp_eq_oprs)
1230 {
1231 adatum = SysCacheGetAttr(CONSTROID, tuple,
1232 Anum_pg_constraint_conppeqop, &isNull);
1233 if (isNull)
1234 elog(ERROR, "null conppeqop for constraint %u", constrId);
1235 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1236 if (ARR_NDIM(arr) != 1 ||
1237 ARR_DIMS(arr)[0] != numkeys ||
1238 ARR_HASNULL(arr) ||
1239 ARR_ELEMTYPE(arr) != OIDOID)
1240 elog(ERROR, "conppeqop is not a 1-D Oid array");
1241 memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1242 if ((Pointer) arr != DatumGetPointer(adatum))
1243 pfree(arr); /* free de-toasted copy, if any */
1244 }
1245
1246 if (ff_eq_oprs)
1247 {
1248 adatum = SysCacheGetAttr(CONSTROID, tuple,
1249 Anum_pg_constraint_conffeqop, &isNull);
1250 if (isNull)
1251 elog(ERROR, "null conffeqop for constraint %u", constrId);
1252 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1253 if (ARR_NDIM(arr) != 1 ||
1254 ARR_DIMS(arr)[0] != numkeys ||
1255 ARR_HASNULL(arr) ||
1256 ARR_ELEMTYPE(arr) != OIDOID)
1257 elog(ERROR, "conffeqop is not a 1-D Oid array");
1258 memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1259 if ((Pointer) arr != DatumGetPointer(adatum))
1260 pfree(arr); /* free de-toasted copy, if any */
1261 }
1262
1263 *numfks = numkeys;
1264 }
1265
1266 /*
1267 * Determine whether a relation can be proven functionally dependent on
1268 * a set of grouping columns. If so, return true and add the pg_constraint
1269 * OIDs of the constraints needed for the proof to the *constraintDeps list.
1270 *
1271 * grouping_columns is a list of grouping expressions, in which columns of
1272 * the rel of interest are Vars with the indicated varno/varlevelsup.
1273 *
1274 * Currently we only check to see if the rel has a primary key that is a
1275 * subset of the grouping_columns. We could also use plain unique constraints
1276 * if all their columns are known not null, but there's a problem: we need
1277 * to be able to represent the not-null-ness as part of the constraints added
1278 * to *constraintDeps. FIXME whenever not-null constraints get represented
1279 * in pg_constraint.
1280 */
1281 bool
check_functional_grouping(Oid relid,Index varno,Index varlevelsup,List * grouping_columns,List ** constraintDeps)1282 check_functional_grouping(Oid relid,
1283 Index varno, Index varlevelsup,
1284 List *grouping_columns,
1285 List **constraintDeps)
1286 {
1287 Bitmapset *pkattnos;
1288 Bitmapset *groupbyattnos;
1289 Oid constraintOid;
1290 ListCell *gl;
1291
1292 /* If the rel has no PK, then we can't prove functional dependency */
1293 pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1294 if (pkattnos == NULL)
1295 return false;
1296
1297 /* Identify all the rel's columns that appear in grouping_columns */
1298 groupbyattnos = NULL;
1299 foreach(gl, grouping_columns)
1300 {
1301 Var *gvar = (Var *) lfirst(gl);
1302
1303 if (IsA(gvar, Var) &&
1304 gvar->varno == varno &&
1305 gvar->varlevelsup == varlevelsup)
1306 groupbyattnos = bms_add_member(groupbyattnos,
1307 gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1308 }
1309
1310 if (bms_is_subset(pkattnos, groupbyattnos))
1311 {
1312 /* The PK is a subset of grouping_columns, so we win */
1313 *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1314 return true;
1315 }
1316
1317 return false;
1318 }
1319