1 /*-------------------------------------------------------------------------
2 *
3 * pg_constraint.c
4 * routines to support manipulation of the pg_constraint relation
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/catalog/pg_constraint.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/sysattr.h"
20 #include "access/table.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/objectaccess.h"
26 #include "catalog/pg_constraint.h"
27 #include "catalog/pg_operator.h"
28 #include "catalog/pg_type.h"
29 #include "commands/defrem.h"
30 #include "commands/tablecmds.h"
31 #include "utils/array.h"
32 #include "utils/builtins.h"
33 #include "utils/fmgroids.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37
38
39 /*
40 * CreateConstraintEntry
41 * Create a constraint table entry.
42 *
43 * Subsidiary records (such as triggers or indexes to implement the
44 * constraint) are *not* created here. But we do make dependency links
45 * from the constraint to the things it depends on.
46 *
47 * The new constraint's OID is returned.
48 */
49 Oid
CreateConstraintEntry(const char * constraintName,Oid constraintNamespace,char constraintType,bool isDeferrable,bool isDeferred,bool isValidated,Oid parentConstrId,Oid relId,const int16 * constraintKey,int constraintNKeys,int constraintNTotalKeys,Oid domainId,Oid indexRelId,Oid foreignRelId,const int16 * foreignKey,const Oid * pfEqOp,const Oid * ppEqOp,const Oid * ffEqOp,int foreignNKeys,char foreignUpdateType,char foreignDeleteType,char foreignMatchType,const Oid * exclOp,Node * conExpr,const char * conBin,bool conIsLocal,int conInhCount,bool conNoInherit,bool is_internal)50 CreateConstraintEntry(const char *constraintName,
51 Oid constraintNamespace,
52 char constraintType,
53 bool isDeferrable,
54 bool isDeferred,
55 bool isValidated,
56 Oid parentConstrId,
57 Oid relId,
58 const int16 *constraintKey,
59 int constraintNKeys,
60 int constraintNTotalKeys,
61 Oid domainId,
62 Oid indexRelId,
63 Oid foreignRelId,
64 const int16 *foreignKey,
65 const Oid *pfEqOp,
66 const Oid *ppEqOp,
67 const Oid *ffEqOp,
68 int foreignNKeys,
69 char foreignUpdateType,
70 char foreignDeleteType,
71 char foreignMatchType,
72 const Oid *exclOp,
73 Node *conExpr,
74 const char *conBin,
75 bool conIsLocal,
76 int conInhCount,
77 bool conNoInherit,
78 bool is_internal)
79 {
80 Relation conDesc;
81 Oid conOid;
82 HeapTuple tup;
83 bool nulls[Natts_pg_constraint];
84 Datum values[Natts_pg_constraint];
85 ArrayType *conkeyArray;
86 ArrayType *confkeyArray;
87 ArrayType *conpfeqopArray;
88 ArrayType *conppeqopArray;
89 ArrayType *conffeqopArray;
90 ArrayType *conexclopArray;
91 NameData cname;
92 int i;
93 ObjectAddress conobject;
94 ObjectAddresses *addrs_auto;
95 ObjectAddresses *addrs_normal;
96
97 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
98
99 Assert(constraintName);
100 namestrcpy(&cname, constraintName);
101
102 /*
103 * Convert C arrays into Postgres arrays.
104 */
105 if (constraintNKeys > 0)
106 {
107 Datum *conkey;
108
109 conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
110 for (i = 0; i < constraintNKeys; i++)
111 conkey[i] = Int16GetDatum(constraintKey[i]);
112 conkeyArray = construct_array(conkey, constraintNKeys,
113 INT2OID, 2, true, TYPALIGN_SHORT);
114 }
115 else
116 conkeyArray = NULL;
117
118 if (foreignNKeys > 0)
119 {
120 Datum *fkdatums;
121
122 fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
123 for (i = 0; i < foreignNKeys; i++)
124 fkdatums[i] = Int16GetDatum(foreignKey[i]);
125 confkeyArray = construct_array(fkdatums, foreignNKeys,
126 INT2OID, 2, true, TYPALIGN_SHORT);
127 for (i = 0; i < foreignNKeys; i++)
128 fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
129 conpfeqopArray = construct_array(fkdatums, foreignNKeys,
130 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
131 for (i = 0; i < foreignNKeys; i++)
132 fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
133 conppeqopArray = construct_array(fkdatums, foreignNKeys,
134 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
135 for (i = 0; i < foreignNKeys; i++)
136 fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
137 conffeqopArray = construct_array(fkdatums, foreignNKeys,
138 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
139 }
140 else
141 {
142 confkeyArray = NULL;
143 conpfeqopArray = NULL;
144 conppeqopArray = NULL;
145 conffeqopArray = NULL;
146 }
147
148 if (exclOp != NULL)
149 {
150 Datum *opdatums;
151
152 opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
153 for (i = 0; i < constraintNKeys; i++)
154 opdatums[i] = ObjectIdGetDatum(exclOp[i]);
155 conexclopArray = construct_array(opdatums, constraintNKeys,
156 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
157 }
158 else
159 conexclopArray = NULL;
160
161 /* initialize nulls and values */
162 for (i = 0; i < Natts_pg_constraint; i++)
163 {
164 nulls[i] = false;
165 values[i] = (Datum) NULL;
166 }
167
168 conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
169 Anum_pg_constraint_oid);
170 values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
171 values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
172 values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
173 values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
174 values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
175 values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
176 values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
177 values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
178 values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
179 values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
180 values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
181 values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
182 values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
183 values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
184 values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
185 values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
186 values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
187 values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
188
189 if (conkeyArray)
190 values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
191 else
192 nulls[Anum_pg_constraint_conkey - 1] = true;
193
194 if (confkeyArray)
195 values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
196 else
197 nulls[Anum_pg_constraint_confkey - 1] = true;
198
199 if (conpfeqopArray)
200 values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
201 else
202 nulls[Anum_pg_constraint_conpfeqop - 1] = true;
203
204 if (conppeqopArray)
205 values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
206 else
207 nulls[Anum_pg_constraint_conppeqop - 1] = true;
208
209 if (conffeqopArray)
210 values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
211 else
212 nulls[Anum_pg_constraint_conffeqop - 1] = true;
213
214 if (conexclopArray)
215 values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
216 else
217 nulls[Anum_pg_constraint_conexclop - 1] = true;
218
219 if (conBin)
220 values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
221 else
222 nulls[Anum_pg_constraint_conbin - 1] = true;
223
224 tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
225
226 CatalogTupleInsert(conDesc, tup);
227
228 ObjectAddressSet(conobject, ConstraintRelationId, conOid);
229
230 table_close(conDesc, RowExclusiveLock);
231
232 /* Handle set of auto dependencies */
233 addrs_auto = new_object_addresses();
234
235 if (OidIsValid(relId))
236 {
237 /*
238 * Register auto dependency from constraint to owning relation, or to
239 * specific column(s) if any are mentioned.
240 */
241 ObjectAddress relobject;
242
243 if (constraintNTotalKeys > 0)
244 {
245 for (i = 0; i < constraintNTotalKeys; i++)
246 {
247 ObjectAddressSubSet(relobject, RelationRelationId, relId,
248 constraintKey[i]);
249 add_exact_object_address(&relobject, addrs_auto);
250 }
251 }
252 else
253 {
254 ObjectAddressSet(relobject, RelationRelationId, relId);
255 add_exact_object_address(&relobject, addrs_auto);
256 }
257 }
258
259 if (OidIsValid(domainId))
260 {
261 /*
262 * Register auto dependency from constraint to owning domain
263 */
264 ObjectAddress domobject;
265
266 ObjectAddressSet(domobject, TypeRelationId, domainId);
267 add_exact_object_address(&domobject, addrs_auto);
268 }
269
270 record_object_address_dependencies(&conobject, addrs_auto,
271 DEPENDENCY_AUTO);
272 free_object_addresses(addrs_auto);
273
274 /* Handle set of normal dependencies */
275 addrs_normal = new_object_addresses();
276
277 if (OidIsValid(foreignRelId))
278 {
279 /*
280 * Register normal dependency from constraint to foreign relation, or
281 * to specific column(s) if any are mentioned.
282 */
283 ObjectAddress relobject;
284
285 if (foreignNKeys > 0)
286 {
287 for (i = 0; i < foreignNKeys; i++)
288 {
289 ObjectAddressSubSet(relobject, RelationRelationId,
290 foreignRelId, foreignKey[i]);
291 add_exact_object_address(&relobject, addrs_normal);
292 }
293 }
294 else
295 {
296 ObjectAddressSet(relobject, RelationRelationId, foreignRelId);
297 add_exact_object_address(&relobject, addrs_normal);
298 }
299 }
300
301 if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
302 {
303 /*
304 * Register normal dependency on the unique index that supports a
305 * foreign-key constraint. (Note: for indexes associated with unique
306 * or primary-key constraints, the dependency runs the other way, and
307 * is not made here.)
308 */
309 ObjectAddress relobject;
310
311 ObjectAddressSet(relobject, RelationRelationId, indexRelId);
312 add_exact_object_address(&relobject, addrs_normal);
313 }
314
315 if (foreignNKeys > 0)
316 {
317 /*
318 * Register normal dependencies on the equality operators that support
319 * a foreign-key constraint. If the PK and FK types are the same then
320 * all three operators for a column are the same; otherwise they are
321 * different.
322 */
323 ObjectAddress oprobject;
324
325 oprobject.classId = OperatorRelationId;
326 oprobject.objectSubId = 0;
327
328 for (i = 0; i < foreignNKeys; i++)
329 {
330 oprobject.objectId = pfEqOp[i];
331 add_exact_object_address(&oprobject, addrs_normal);
332 if (ppEqOp[i] != pfEqOp[i])
333 {
334 oprobject.objectId = ppEqOp[i];
335 add_exact_object_address(&oprobject, addrs_normal);
336 }
337 if (ffEqOp[i] != pfEqOp[i])
338 {
339 oprobject.objectId = ffEqOp[i];
340 add_exact_object_address(&oprobject, addrs_normal);
341 }
342 }
343 }
344
345 record_object_address_dependencies(&conobject, addrs_normal,
346 DEPENDENCY_NORMAL);
347 free_object_addresses(addrs_normal);
348
349 /*
350 * We don't bother to register dependencies on the exclusion operators of
351 * an exclusion constraint. We assume they are members of the opclass
352 * supporting the index, so there's an indirect dependency via that. (This
353 * would be pretty dicey for cross-type operators, but exclusion operators
354 * can never be cross-type.)
355 */
356
357 if (conExpr != NULL)
358 {
359 /*
360 * Register dependencies from constraint to objects mentioned in CHECK
361 * expression.
362 */
363 recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
364 DEPENDENCY_NORMAL,
365 DEPENDENCY_NORMAL, false);
366 }
367
368 /* Post creation hook for new constraint */
369 InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
370 is_internal);
371
372 return conOid;
373 }
374
375 /*
376 * Test whether given name is currently used as a constraint name
377 * for the given object (relation or domain).
378 *
379 * This is used to decide whether to accept a user-specified constraint name.
380 * It is deliberately not the same test as ChooseConstraintName uses to decide
381 * whether an auto-generated name is OK: here, we will allow it unless there
382 * is an identical constraint name in use *on the same object*.
383 *
384 * NB: Caller should hold exclusive lock on the given object, else
385 * this test can be fooled by concurrent additions.
386 */
387 bool
ConstraintNameIsUsed(ConstraintCategory conCat,Oid objId,const char * conname)388 ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
389 const char *conname)
390 {
391 bool found;
392 Relation conDesc;
393 SysScanDesc conscan;
394 ScanKeyData skey[3];
395
396 conDesc = table_open(ConstraintRelationId, AccessShareLock);
397
398 ScanKeyInit(&skey[0],
399 Anum_pg_constraint_conrelid,
400 BTEqualStrategyNumber, F_OIDEQ,
401 ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
402 ? objId : InvalidOid));
403 ScanKeyInit(&skey[1],
404 Anum_pg_constraint_contypid,
405 BTEqualStrategyNumber, F_OIDEQ,
406 ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
407 ? objId : InvalidOid));
408 ScanKeyInit(&skey[2],
409 Anum_pg_constraint_conname,
410 BTEqualStrategyNumber, F_NAMEEQ,
411 CStringGetDatum(conname));
412
413 conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
414 true, NULL, 3, skey);
415
416 /* There can be at most one matching row */
417 found = (HeapTupleIsValid(systable_getnext(conscan)));
418
419 systable_endscan(conscan);
420 table_close(conDesc, AccessShareLock);
421
422 return found;
423 }
424
425 /*
426 * Does any constraint of the given name exist in the given namespace?
427 *
428 * This is used for code that wants to match ChooseConstraintName's rule
429 * that we should avoid autogenerating duplicate constraint names within a
430 * namespace.
431 */
432 bool
ConstraintNameExists(const char * conname,Oid namespaceid)433 ConstraintNameExists(const char *conname, Oid namespaceid)
434 {
435 bool found;
436 Relation conDesc;
437 SysScanDesc conscan;
438 ScanKeyData skey[2];
439
440 conDesc = table_open(ConstraintRelationId, AccessShareLock);
441
442 ScanKeyInit(&skey[0],
443 Anum_pg_constraint_conname,
444 BTEqualStrategyNumber, F_NAMEEQ,
445 CStringGetDatum(conname));
446
447 ScanKeyInit(&skey[1],
448 Anum_pg_constraint_connamespace,
449 BTEqualStrategyNumber, F_OIDEQ,
450 ObjectIdGetDatum(namespaceid));
451
452 conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
453 NULL, 2, skey);
454
455 found = (HeapTupleIsValid(systable_getnext(conscan)));
456
457 systable_endscan(conscan);
458 table_close(conDesc, AccessShareLock);
459
460 return found;
461 }
462
463 /*
464 * Select a nonconflicting name for a new constraint.
465 *
466 * The objective here is to choose a name that is unique within the
467 * specified namespace. Postgres does not require this, but the SQL
468 * spec does, and some apps depend on it. Therefore we avoid choosing
469 * default names that so conflict.
470 *
471 * name1, name2, and label are used the same way as for makeObjectName(),
472 * except that the label can't be NULL; digits will be appended to the label
473 * if needed to create a name that is unique within the specified namespace.
474 *
475 * 'others' can be a list of string names already chosen within the current
476 * command (but not yet reflected into the catalogs); we will not choose
477 * a duplicate of one of these either.
478 *
479 * Note: it is theoretically possible to get a collision anyway, if someone
480 * else chooses the same name concurrently. This is fairly unlikely to be
481 * a problem in practice, especially if one is holding an exclusive lock on
482 * the relation identified by name1.
483 *
484 * Returns a palloc'd string.
485 */
486 char *
ChooseConstraintName(const char * name1,const char * name2,const char * label,Oid namespaceid,List * others)487 ChooseConstraintName(const char *name1, const char *name2,
488 const char *label, Oid namespaceid,
489 List *others)
490 {
491 int pass = 0;
492 char *conname = NULL;
493 char modlabel[NAMEDATALEN];
494 Relation conDesc;
495 SysScanDesc conscan;
496 ScanKeyData skey[2];
497 bool found;
498 ListCell *l;
499
500 conDesc = table_open(ConstraintRelationId, AccessShareLock);
501
502 /* try the unmodified label first */
503 strlcpy(modlabel, label, sizeof(modlabel));
504
505 for (;;)
506 {
507 conname = makeObjectName(name1, name2, modlabel);
508
509 found = false;
510
511 foreach(l, others)
512 {
513 if (strcmp((char *) lfirst(l), conname) == 0)
514 {
515 found = true;
516 break;
517 }
518 }
519
520 if (!found)
521 {
522 ScanKeyInit(&skey[0],
523 Anum_pg_constraint_conname,
524 BTEqualStrategyNumber, F_NAMEEQ,
525 CStringGetDatum(conname));
526
527 ScanKeyInit(&skey[1],
528 Anum_pg_constraint_connamespace,
529 BTEqualStrategyNumber, F_OIDEQ,
530 ObjectIdGetDatum(namespaceid));
531
532 conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
533 NULL, 2, skey);
534
535 found = (HeapTupleIsValid(systable_getnext(conscan)));
536
537 systable_endscan(conscan);
538 }
539
540 if (!found)
541 break;
542
543 /* found a conflict, so try a new name component */
544 pfree(conname);
545 snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
546 }
547
548 table_close(conDesc, AccessShareLock);
549
550 return conname;
551 }
552
553 /*
554 * Delete a single constraint record.
555 */
556 void
RemoveConstraintById(Oid conId)557 RemoveConstraintById(Oid conId)
558 {
559 Relation conDesc;
560 HeapTuple tup;
561 Form_pg_constraint con;
562
563 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
564
565 tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
566 if (!HeapTupleIsValid(tup)) /* should not happen */
567 elog(ERROR, "cache lookup failed for constraint %u", conId);
568 con = (Form_pg_constraint) GETSTRUCT(tup);
569
570 /*
571 * Special processing depending on what the constraint is for.
572 */
573 if (OidIsValid(con->conrelid))
574 {
575 Relation rel;
576
577 /*
578 * If the constraint is for a relation, open and exclusive-lock the
579 * relation it's for.
580 */
581 rel = table_open(con->conrelid, AccessExclusiveLock);
582
583 /*
584 * We need to update the relchecks count if it is a check constraint
585 * being dropped. This update will force backends to rebuild relcache
586 * entries when we commit.
587 */
588 if (con->contype == CONSTRAINT_CHECK)
589 {
590 Relation pgrel;
591 HeapTuple relTup;
592 Form_pg_class classForm;
593
594 pgrel = table_open(RelationRelationId, RowExclusiveLock);
595 relTup = SearchSysCacheCopy1(RELOID,
596 ObjectIdGetDatum(con->conrelid));
597 if (!HeapTupleIsValid(relTup))
598 elog(ERROR, "cache lookup failed for relation %u",
599 con->conrelid);
600 classForm = (Form_pg_class) GETSTRUCT(relTup);
601
602 if (classForm->relchecks == 0) /* should not happen */
603 elog(ERROR, "relation \"%s\" has relchecks = 0",
604 RelationGetRelationName(rel));
605 classForm->relchecks--;
606
607 CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
608
609 heap_freetuple(relTup);
610
611 table_close(pgrel, RowExclusiveLock);
612 }
613
614 /* Keep lock on constraint's rel until end of xact */
615 table_close(rel, NoLock);
616 }
617 else if (OidIsValid(con->contypid))
618 {
619 /*
620 * XXX for now, do nothing special when dropping a domain constraint
621 *
622 * Probably there should be some form of locking on the domain type,
623 * but we have no such concept at the moment.
624 */
625 }
626 else
627 elog(ERROR, "constraint %u is not of a known type", conId);
628
629 /* Fry the constraint itself */
630 CatalogTupleDelete(conDesc, &tup->t_self);
631
632 /* Clean up */
633 ReleaseSysCache(tup);
634 table_close(conDesc, RowExclusiveLock);
635 }
636
637 /*
638 * RenameConstraintById
639 * Rename a constraint.
640 *
641 * Note: this isn't intended to be a user-exposed function; it doesn't check
642 * permissions etc. Currently this is only invoked when renaming an index
643 * that is associated with a constraint, but it's made a little more general
644 * than that with the expectation of someday having ALTER TABLE RENAME
645 * CONSTRAINT.
646 */
647 void
RenameConstraintById(Oid conId,const char * newname)648 RenameConstraintById(Oid conId, const char *newname)
649 {
650 Relation conDesc;
651 HeapTuple tuple;
652 Form_pg_constraint con;
653
654 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
655
656 tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
657 if (!HeapTupleIsValid(tuple))
658 elog(ERROR, "cache lookup failed for constraint %u", conId);
659 con = (Form_pg_constraint) GETSTRUCT(tuple);
660
661 /*
662 * For user-friendliness, check whether the name is already in use.
663 */
664 if (OidIsValid(con->conrelid) &&
665 ConstraintNameIsUsed(CONSTRAINT_RELATION,
666 con->conrelid,
667 newname))
668 ereport(ERROR,
669 (errcode(ERRCODE_DUPLICATE_OBJECT),
670 errmsg("constraint \"%s\" for relation \"%s\" already exists",
671 newname, get_rel_name(con->conrelid))));
672 if (OidIsValid(con->contypid) &&
673 ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
674 con->contypid,
675 newname))
676 ereport(ERROR,
677 (errcode(ERRCODE_DUPLICATE_OBJECT),
678 errmsg("constraint \"%s\" for domain %s already exists",
679 newname, format_type_be(con->contypid))));
680
681 /* OK, do the rename --- tuple is a copy, so OK to scribble on it */
682 namestrcpy(&(con->conname), newname);
683
684 CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
685
686 InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
687
688 heap_freetuple(tuple);
689 table_close(conDesc, RowExclusiveLock);
690 }
691
692 /*
693 * AlterConstraintNamespaces
694 * Find any constraints belonging to the specified object,
695 * and move them to the specified new namespace.
696 *
697 * isType indicates whether the owning object is a type or a relation.
698 */
699 void
AlterConstraintNamespaces(Oid ownerId,Oid oldNspId,Oid newNspId,bool isType,ObjectAddresses * objsMoved)700 AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
701 Oid newNspId, bool isType, ObjectAddresses *objsMoved)
702 {
703 Relation conRel;
704 ScanKeyData key[2];
705 SysScanDesc scan;
706 HeapTuple tup;
707
708 conRel = table_open(ConstraintRelationId, RowExclusiveLock);
709
710 ScanKeyInit(&key[0],
711 Anum_pg_constraint_conrelid,
712 BTEqualStrategyNumber, F_OIDEQ,
713 ObjectIdGetDatum(isType ? InvalidOid : ownerId));
714 ScanKeyInit(&key[1],
715 Anum_pg_constraint_contypid,
716 BTEqualStrategyNumber, F_OIDEQ,
717 ObjectIdGetDatum(isType ? ownerId : InvalidOid));
718
719 scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
720 NULL, 2, key);
721
722 while (HeapTupleIsValid((tup = systable_getnext(scan))))
723 {
724 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
725 ObjectAddress thisobj;
726
727 ObjectAddressSet(thisobj, ConstraintRelationId, conform->oid);
728
729 if (object_address_present(&thisobj, objsMoved))
730 continue;
731
732 /* Don't update if the object is already part of the namespace */
733 if (conform->connamespace == oldNspId && oldNspId != newNspId)
734 {
735 tup = heap_copytuple(tup);
736 conform = (Form_pg_constraint) GETSTRUCT(tup);
737
738 conform->connamespace = newNspId;
739
740 CatalogTupleUpdate(conRel, &tup->t_self, tup);
741
742 /*
743 * Note: currently, the constraint will not have its own
744 * dependency on the namespace, so we don't need to do
745 * changeDependencyFor().
746 */
747 }
748
749 InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
750
751 add_exact_object_address(&thisobj, objsMoved);
752 }
753
754 systable_endscan(scan);
755
756 table_close(conRel, RowExclusiveLock);
757 }
758
759 /*
760 * ConstraintSetParentConstraint
761 * Set a partition's constraint as child of its parent constraint,
762 * or remove the linkage if parentConstrId is InvalidOid.
763 *
764 * This updates the constraint's pg_constraint row to show it as inherited, and
765 * adds PARTITION dependencies to prevent the constraint from being deleted
766 * on its own. Alternatively, reverse that.
767 */
768 void
ConstraintSetParentConstraint(Oid childConstrId,Oid parentConstrId,Oid childTableId)769 ConstraintSetParentConstraint(Oid childConstrId,
770 Oid parentConstrId,
771 Oid childTableId)
772 {
773 Relation constrRel;
774 Form_pg_constraint constrForm;
775 HeapTuple tuple,
776 newtup;
777 ObjectAddress depender;
778 ObjectAddress referenced;
779
780 constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
781 tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
782 if (!HeapTupleIsValid(tuple))
783 elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
784 newtup = heap_copytuple(tuple);
785 constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
786 if (OidIsValid(parentConstrId))
787 {
788 /* don't allow setting parent for a constraint that already has one */
789 Assert(constrForm->coninhcount == 0);
790 if (constrForm->conparentid != InvalidOid)
791 elog(ERROR, "constraint %u already has a parent constraint",
792 childConstrId);
793
794 constrForm->conislocal = false;
795 constrForm->coninhcount++;
796 constrForm->conparentid = parentConstrId;
797
798 CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
799
800 ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
801
802 ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
803 recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
804
805 ObjectAddressSet(referenced, RelationRelationId, childTableId);
806 recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
807 }
808 else
809 {
810 constrForm->coninhcount--;
811 constrForm->conislocal = true;
812 constrForm->conparentid = InvalidOid;
813
814 /* Make sure there's no further inheritance. */
815 Assert(constrForm->coninhcount == 0);
816
817 CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
818
819 deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
820 ConstraintRelationId,
821 DEPENDENCY_PARTITION_PRI);
822 deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
823 RelationRelationId,
824 DEPENDENCY_PARTITION_SEC);
825 }
826
827 ReleaseSysCache(tuple);
828 table_close(constrRel, RowExclusiveLock);
829 }
830
831
832 /*
833 * get_relation_constraint_oid
834 * Find a constraint on the specified relation with the specified name.
835 * Returns constraint's OID.
836 */
837 Oid
get_relation_constraint_oid(Oid relid,const char * conname,bool missing_ok)838 get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
839 {
840 Relation pg_constraint;
841 HeapTuple tuple;
842 SysScanDesc scan;
843 ScanKeyData skey[3];
844 Oid conOid = InvalidOid;
845
846 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
847
848 ScanKeyInit(&skey[0],
849 Anum_pg_constraint_conrelid,
850 BTEqualStrategyNumber, F_OIDEQ,
851 ObjectIdGetDatum(relid));
852 ScanKeyInit(&skey[1],
853 Anum_pg_constraint_contypid,
854 BTEqualStrategyNumber, F_OIDEQ,
855 ObjectIdGetDatum(InvalidOid));
856 ScanKeyInit(&skey[2],
857 Anum_pg_constraint_conname,
858 BTEqualStrategyNumber, F_NAMEEQ,
859 CStringGetDatum(conname));
860
861 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
862 NULL, 3, skey);
863
864 /* There can be at most one matching row */
865 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
866 conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
867
868 systable_endscan(scan);
869
870 /* If no such constraint exists, complain */
871 if (!OidIsValid(conOid) && !missing_ok)
872 ereport(ERROR,
873 (errcode(ERRCODE_UNDEFINED_OBJECT),
874 errmsg("constraint \"%s\" for table \"%s\" does not exist",
875 conname, get_rel_name(relid))));
876
877 table_close(pg_constraint, AccessShareLock);
878
879 return conOid;
880 }
881
882 /*
883 * get_relation_constraint_attnos
884 * Find a constraint on the specified relation with the specified name
885 * and return the constrained columns.
886 *
887 * Returns a Bitmapset of the column attnos of the constrained columns, with
888 * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
889 * columns can be represented.
890 *
891 * *constraintOid is set to the OID of the constraint, or InvalidOid on
892 * failure.
893 */
894 Bitmapset *
get_relation_constraint_attnos(Oid relid,const char * conname,bool missing_ok,Oid * constraintOid)895 get_relation_constraint_attnos(Oid relid, const char *conname,
896 bool missing_ok, Oid *constraintOid)
897 {
898 Bitmapset *conattnos = NULL;
899 Relation pg_constraint;
900 HeapTuple tuple;
901 SysScanDesc scan;
902 ScanKeyData skey[3];
903
904 /* Set *constraintOid, to avoid complaints about uninitialized vars */
905 *constraintOid = InvalidOid;
906
907 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
908
909 ScanKeyInit(&skey[0],
910 Anum_pg_constraint_conrelid,
911 BTEqualStrategyNumber, F_OIDEQ,
912 ObjectIdGetDatum(relid));
913 ScanKeyInit(&skey[1],
914 Anum_pg_constraint_contypid,
915 BTEqualStrategyNumber, F_OIDEQ,
916 ObjectIdGetDatum(InvalidOid));
917 ScanKeyInit(&skey[2],
918 Anum_pg_constraint_conname,
919 BTEqualStrategyNumber, F_NAMEEQ,
920 CStringGetDatum(conname));
921
922 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
923 NULL, 3, skey);
924
925 /* There can be at most one matching row */
926 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
927 {
928 Datum adatum;
929 bool isNull;
930
931 *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
932
933 /* Extract the conkey array, ie, attnums of constrained columns */
934 adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
935 RelationGetDescr(pg_constraint), &isNull);
936 if (!isNull)
937 {
938 ArrayType *arr;
939 int numcols;
940 int16 *attnums;
941 int i;
942
943 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
944 numcols = ARR_DIMS(arr)[0];
945 if (ARR_NDIM(arr) != 1 ||
946 numcols < 0 ||
947 ARR_HASNULL(arr) ||
948 ARR_ELEMTYPE(arr) != INT2OID)
949 elog(ERROR, "conkey is not a 1-D smallint array");
950 attnums = (int16 *) ARR_DATA_PTR(arr);
951
952 /* Construct the result value */
953 for (i = 0; i < numcols; i++)
954 {
955 conattnos = bms_add_member(conattnos,
956 attnums[i] - FirstLowInvalidHeapAttributeNumber);
957 }
958 }
959 }
960
961 systable_endscan(scan);
962
963 /* If no such constraint exists, complain */
964 if (!OidIsValid(*constraintOid) && !missing_ok)
965 ereport(ERROR,
966 (errcode(ERRCODE_UNDEFINED_OBJECT),
967 errmsg("constraint \"%s\" for table \"%s\" does not exist",
968 conname, get_rel_name(relid))));
969
970 table_close(pg_constraint, AccessShareLock);
971
972 return conattnos;
973 }
974
975 /*
976 * Return the OID of the constraint associated with the given index in the
977 * given relation; or InvalidOid if no such index is catalogued.
978 */
979 Oid
get_relation_idx_constraint_oid(Oid relationId,Oid indexId)980 get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
981 {
982 Relation pg_constraint;
983 SysScanDesc scan;
984 ScanKeyData key;
985 HeapTuple tuple;
986 Oid constraintId = InvalidOid;
987
988 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
989
990 ScanKeyInit(&key,
991 Anum_pg_constraint_conrelid,
992 BTEqualStrategyNumber,
993 F_OIDEQ,
994 ObjectIdGetDatum(relationId));
995 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
996 true, NULL, 1, &key);
997 while ((tuple = systable_getnext(scan)) != NULL)
998 {
999 Form_pg_constraint constrForm;
1000
1001 constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
1002 if (constrForm->conindid == indexId)
1003 {
1004 constraintId = constrForm->oid;
1005 break;
1006 }
1007 }
1008 systable_endscan(scan);
1009
1010 table_close(pg_constraint, AccessShareLock);
1011 return constraintId;
1012 }
1013
1014 /*
1015 * get_domain_constraint_oid
1016 * Find a constraint on the specified domain with the specified name.
1017 * Returns constraint's OID.
1018 */
1019 Oid
get_domain_constraint_oid(Oid typid,const char * conname,bool missing_ok)1020 get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1021 {
1022 Relation pg_constraint;
1023 HeapTuple tuple;
1024 SysScanDesc scan;
1025 ScanKeyData skey[3];
1026 Oid conOid = InvalidOid;
1027
1028 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1029
1030 ScanKeyInit(&skey[0],
1031 Anum_pg_constraint_conrelid,
1032 BTEqualStrategyNumber, F_OIDEQ,
1033 ObjectIdGetDatum(InvalidOid));
1034 ScanKeyInit(&skey[1],
1035 Anum_pg_constraint_contypid,
1036 BTEqualStrategyNumber, F_OIDEQ,
1037 ObjectIdGetDatum(typid));
1038 ScanKeyInit(&skey[2],
1039 Anum_pg_constraint_conname,
1040 BTEqualStrategyNumber, F_NAMEEQ,
1041 CStringGetDatum(conname));
1042
1043 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1044 NULL, 3, skey);
1045
1046 /* There can be at most one matching row */
1047 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1048 conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1049
1050 systable_endscan(scan);
1051
1052 /* If no such constraint exists, complain */
1053 if (!OidIsValid(conOid) && !missing_ok)
1054 ereport(ERROR,
1055 (errcode(ERRCODE_UNDEFINED_OBJECT),
1056 errmsg("constraint \"%s\" for domain %s does not exist",
1057 conname, format_type_be(typid))));
1058
1059 table_close(pg_constraint, AccessShareLock);
1060
1061 return conOid;
1062 }
1063
1064 /*
1065 * get_primary_key_attnos
1066 * Identify the columns in a relation's primary key, if any.
1067 *
1068 * Returns a Bitmapset of the column attnos of the primary key's columns,
1069 * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1070 * system columns can be represented.
1071 *
1072 * If there is no primary key, return NULL. We also return NULL if the pkey
1073 * constraint is deferrable and deferrableOk is false.
1074 *
1075 * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1076 * on failure.
1077 */
1078 Bitmapset *
get_primary_key_attnos(Oid relid,bool deferrableOk,Oid * constraintOid)1079 get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1080 {
1081 Bitmapset *pkattnos = NULL;
1082 Relation pg_constraint;
1083 HeapTuple tuple;
1084 SysScanDesc scan;
1085 ScanKeyData skey[1];
1086
1087 /* Set *constraintOid, to avoid complaints about uninitialized vars */
1088 *constraintOid = InvalidOid;
1089
1090 /* Scan pg_constraint for constraints of the target rel */
1091 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1092
1093 ScanKeyInit(&skey[0],
1094 Anum_pg_constraint_conrelid,
1095 BTEqualStrategyNumber, F_OIDEQ,
1096 ObjectIdGetDatum(relid));
1097
1098 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1099 NULL, 1, skey);
1100
1101 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1102 {
1103 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1104 Datum adatum;
1105 bool isNull;
1106 ArrayType *arr;
1107 int16 *attnums;
1108 int numkeys;
1109 int i;
1110
1111 /* Skip constraints that are not PRIMARY KEYs */
1112 if (con->contype != CONSTRAINT_PRIMARY)
1113 continue;
1114
1115 /*
1116 * If the primary key is deferrable, but we've been instructed to
1117 * ignore deferrable constraints, then we might as well give up
1118 * searching, since there can only be a single primary key on a table.
1119 */
1120 if (con->condeferrable && !deferrableOk)
1121 break;
1122
1123 /* Extract the conkey array, ie, attnums of PK's columns */
1124 adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1125 RelationGetDescr(pg_constraint), &isNull);
1126 if (isNull)
1127 elog(ERROR, "null conkey for constraint %u",
1128 ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1129 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1130 numkeys = ARR_DIMS(arr)[0];
1131 if (ARR_NDIM(arr) != 1 ||
1132 numkeys < 0 ||
1133 ARR_HASNULL(arr) ||
1134 ARR_ELEMTYPE(arr) != INT2OID)
1135 elog(ERROR, "conkey is not a 1-D smallint array");
1136 attnums = (int16 *) ARR_DATA_PTR(arr);
1137
1138 /* Construct the result value */
1139 for (i = 0; i < numkeys; i++)
1140 {
1141 pkattnos = bms_add_member(pkattnos,
1142 attnums[i] - FirstLowInvalidHeapAttributeNumber);
1143 }
1144 *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1145
1146 /* No need to search further */
1147 break;
1148 }
1149
1150 systable_endscan(scan);
1151
1152 table_close(pg_constraint, AccessShareLock);
1153
1154 return pkattnos;
1155 }
1156
1157 /*
1158 * Extract data from the pg_constraint tuple of a foreign-key constraint.
1159 *
1160 * All arguments save the first are output arguments; the last three of them
1161 * can be passed as NULL if caller doesn't need them.
1162 */
1163 void
DeconstructFkConstraintRow(HeapTuple tuple,int * numfks,AttrNumber * conkey,AttrNumber * confkey,Oid * pf_eq_oprs,Oid * pp_eq_oprs,Oid * ff_eq_oprs)1164 DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1165 AttrNumber *conkey, AttrNumber *confkey,
1166 Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
1167 {
1168 Oid constrId;
1169 Datum adatum;
1170 bool isNull;
1171 ArrayType *arr;
1172 int numkeys;
1173
1174 constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1175
1176 /*
1177 * We expect the arrays to be 1-D arrays of the right types; verify that.
1178 * We don't need to use deconstruct_array() since the array data is just
1179 * going to look like a C array of values.
1180 */
1181 adatum = SysCacheGetAttr(CONSTROID, tuple,
1182 Anum_pg_constraint_conkey, &isNull);
1183 if (isNull)
1184 elog(ERROR, "null conkey for constraint %u", constrId);
1185 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1186 if (ARR_NDIM(arr) != 1 ||
1187 ARR_HASNULL(arr) ||
1188 ARR_ELEMTYPE(arr) != INT2OID)
1189 elog(ERROR, "conkey is not a 1-D smallint array");
1190 numkeys = ARR_DIMS(arr)[0];
1191 if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1192 elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1193 memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1194 if ((Pointer) arr != DatumGetPointer(adatum))
1195 pfree(arr); /* free de-toasted copy, if any */
1196
1197 adatum = SysCacheGetAttr(CONSTROID, tuple,
1198 Anum_pg_constraint_confkey, &isNull);
1199 if (isNull)
1200 elog(ERROR, "null confkey for constraint %u", constrId);
1201 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1202 if (ARR_NDIM(arr) != 1 ||
1203 ARR_DIMS(arr)[0] != numkeys ||
1204 ARR_HASNULL(arr) ||
1205 ARR_ELEMTYPE(arr) != INT2OID)
1206 elog(ERROR, "confkey is not a 1-D smallint array");
1207 memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1208 if ((Pointer) arr != DatumGetPointer(adatum))
1209 pfree(arr); /* free de-toasted copy, if any */
1210
1211 if (pf_eq_oprs)
1212 {
1213 adatum = SysCacheGetAttr(CONSTROID, tuple,
1214 Anum_pg_constraint_conpfeqop, &isNull);
1215 if (isNull)
1216 elog(ERROR, "null conpfeqop for constraint %u", constrId);
1217 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1218 /* see TryReuseForeignKey if you change the test below */
1219 if (ARR_NDIM(arr) != 1 ||
1220 ARR_DIMS(arr)[0] != numkeys ||
1221 ARR_HASNULL(arr) ||
1222 ARR_ELEMTYPE(arr) != OIDOID)
1223 elog(ERROR, "conpfeqop is not a 1-D Oid array");
1224 memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1225 if ((Pointer) arr != DatumGetPointer(adatum))
1226 pfree(arr); /* free de-toasted copy, if any */
1227 }
1228
1229 if (pp_eq_oprs)
1230 {
1231 adatum = SysCacheGetAttr(CONSTROID, tuple,
1232 Anum_pg_constraint_conppeqop, &isNull);
1233 if (isNull)
1234 elog(ERROR, "null conppeqop for constraint %u", constrId);
1235 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1236 if (ARR_NDIM(arr) != 1 ||
1237 ARR_DIMS(arr)[0] != numkeys ||
1238 ARR_HASNULL(arr) ||
1239 ARR_ELEMTYPE(arr) != OIDOID)
1240 elog(ERROR, "conppeqop is not a 1-D Oid array");
1241 memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1242 if ((Pointer) arr != DatumGetPointer(adatum))
1243 pfree(arr); /* free de-toasted copy, if any */
1244 }
1245
1246 if (ff_eq_oprs)
1247 {
1248 adatum = SysCacheGetAttr(CONSTROID, tuple,
1249 Anum_pg_constraint_conffeqop, &isNull);
1250 if (isNull)
1251 elog(ERROR, "null conffeqop for constraint %u", constrId);
1252 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1253 if (ARR_NDIM(arr) != 1 ||
1254 ARR_DIMS(arr)[0] != numkeys ||
1255 ARR_HASNULL(arr) ||
1256 ARR_ELEMTYPE(arr) != OIDOID)
1257 elog(ERROR, "conffeqop is not a 1-D Oid array");
1258 memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1259 if ((Pointer) arr != DatumGetPointer(adatum))
1260 pfree(arr); /* free de-toasted copy, if any */
1261 }
1262
1263 *numfks = numkeys;
1264 }
1265
1266 /*
1267 * Determine whether a relation can be proven functionally dependent on
1268 * a set of grouping columns. If so, return true and add the pg_constraint
1269 * OIDs of the constraints needed for the proof to the *constraintDeps list.
1270 *
1271 * grouping_columns is a list of grouping expressions, in which columns of
1272 * the rel of interest are Vars with the indicated varno/varlevelsup.
1273 *
1274 * Currently we only check to see if the rel has a primary key that is a
1275 * subset of the grouping_columns. We could also use plain unique constraints
1276 * if all their columns are known not null, but there's a problem: we need
1277 * to be able to represent the not-null-ness as part of the constraints added
1278 * to *constraintDeps. FIXME whenever not-null constraints get represented
1279 * in pg_constraint.
1280 */
1281 bool
check_functional_grouping(Oid relid,Index varno,Index varlevelsup,List * grouping_columns,List ** constraintDeps)1282 check_functional_grouping(Oid relid,
1283 Index varno, Index varlevelsup,
1284 List *grouping_columns,
1285 List **constraintDeps)
1286 {
1287 Bitmapset *pkattnos;
1288 Bitmapset *groupbyattnos;
1289 Oid constraintOid;
1290 ListCell *gl;
1291
1292 /* If the rel has no PK, then we can't prove functional dependency */
1293 pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1294 if (pkattnos == NULL)
1295 return false;
1296
1297 /* Identify all the rel's columns that appear in grouping_columns */
1298 groupbyattnos = NULL;
1299 foreach(gl, grouping_columns)
1300 {
1301 Var *gvar = (Var *) lfirst(gl);
1302
1303 if (IsA(gvar, Var) &&
1304 gvar->varno == varno &&
1305 gvar->varlevelsup == varlevelsup)
1306 groupbyattnos = bms_add_member(groupbyattnos,
1307 gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1308 }
1309
1310 if (bms_is_subset(pkattnos, groupbyattnos))
1311 {
1312 /* The PK is a subset of grouping_columns, so we win */
1313 *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1314 return true;
1315 }
1316
1317 return false;
1318 }
1319