1 /*-------------------------------------------------------------------------
2  *
3  * pg_constraint.c
4  *	  routines to support manipulation of the pg_constraint relation
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/catalog/pg_constraint.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/sysattr.h"
20 #include "access/table.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/objectaccess.h"
26 #include "catalog/pg_constraint.h"
27 #include "catalog/pg_operator.h"
28 #include "catalog/pg_type.h"
29 #include "commands/defrem.h"
30 #include "commands/tablecmds.h"
31 #include "utils/array.h"
32 #include "utils/builtins.h"
33 #include "utils/fmgroids.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37 
38 
39 /*
40  * CreateConstraintEntry
41  *	Create a constraint table entry.
42  *
43  * Subsidiary records (such as triggers or indexes to implement the
44  * constraint) are *not* created here.  But we do make dependency links
45  * from the constraint to the things it depends on.
46  *
47  * The new constraint's OID is returned.
48  */
49 Oid
CreateConstraintEntry(const char * constraintName,Oid constraintNamespace,char constraintType,bool isDeferrable,bool isDeferred,bool isValidated,Oid parentConstrId,Oid relId,const int16 * constraintKey,int constraintNKeys,int constraintNTotalKeys,Oid domainId,Oid indexRelId,Oid foreignRelId,const int16 * foreignKey,const Oid * pfEqOp,const Oid * ppEqOp,const Oid * ffEqOp,int foreignNKeys,char foreignUpdateType,char foreignDeleteType,char foreignMatchType,const Oid * exclOp,Node * conExpr,const char * conBin,bool conIsLocal,int conInhCount,bool conNoInherit,bool is_internal)50 CreateConstraintEntry(const char *constraintName,
51 					  Oid constraintNamespace,
52 					  char constraintType,
53 					  bool isDeferrable,
54 					  bool isDeferred,
55 					  bool isValidated,
56 					  Oid parentConstrId,
57 					  Oid relId,
58 					  const int16 *constraintKey,
59 					  int constraintNKeys,
60 					  int constraintNTotalKeys,
61 					  Oid domainId,
62 					  Oid indexRelId,
63 					  Oid foreignRelId,
64 					  const int16 *foreignKey,
65 					  const Oid *pfEqOp,
66 					  const Oid *ppEqOp,
67 					  const Oid *ffEqOp,
68 					  int foreignNKeys,
69 					  char foreignUpdateType,
70 					  char foreignDeleteType,
71 					  char foreignMatchType,
72 					  const Oid *exclOp,
73 					  Node *conExpr,
74 					  const char *conBin,
75 					  bool conIsLocal,
76 					  int conInhCount,
77 					  bool conNoInherit,
78 					  bool is_internal)
79 {
80 	Relation	conDesc;
81 	Oid			conOid;
82 	HeapTuple	tup;
83 	bool		nulls[Natts_pg_constraint];
84 	Datum		values[Natts_pg_constraint];
85 	ArrayType  *conkeyArray;
86 	ArrayType  *confkeyArray;
87 	ArrayType  *conpfeqopArray;
88 	ArrayType  *conppeqopArray;
89 	ArrayType  *conffeqopArray;
90 	ArrayType  *conexclopArray;
91 	NameData	cname;
92 	int			i;
93 	ObjectAddress conobject;
94 	ObjectAddresses *addrs_auto;
95 	ObjectAddresses *addrs_normal;
96 
97 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
98 
99 	Assert(constraintName);
100 	namestrcpy(&cname, constraintName);
101 
102 	/*
103 	 * Convert C arrays into Postgres arrays.
104 	 */
105 	if (constraintNKeys > 0)
106 	{
107 		Datum	   *conkey;
108 
109 		conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
110 		for (i = 0; i < constraintNKeys; i++)
111 			conkey[i] = Int16GetDatum(constraintKey[i]);
112 		conkeyArray = construct_array(conkey, constraintNKeys,
113 									  INT2OID, 2, true, TYPALIGN_SHORT);
114 	}
115 	else
116 		conkeyArray = NULL;
117 
118 	if (foreignNKeys > 0)
119 	{
120 		Datum	   *fkdatums;
121 
122 		fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
123 		for (i = 0; i < foreignNKeys; i++)
124 			fkdatums[i] = Int16GetDatum(foreignKey[i]);
125 		confkeyArray = construct_array(fkdatums, foreignNKeys,
126 									   INT2OID, 2, true, TYPALIGN_SHORT);
127 		for (i = 0; i < foreignNKeys; i++)
128 			fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
129 		conpfeqopArray = construct_array(fkdatums, foreignNKeys,
130 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
131 		for (i = 0; i < foreignNKeys; i++)
132 			fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
133 		conppeqopArray = construct_array(fkdatums, foreignNKeys,
134 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
135 		for (i = 0; i < foreignNKeys; i++)
136 			fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
137 		conffeqopArray = construct_array(fkdatums, foreignNKeys,
138 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
139 	}
140 	else
141 	{
142 		confkeyArray = NULL;
143 		conpfeqopArray = NULL;
144 		conppeqopArray = NULL;
145 		conffeqopArray = NULL;
146 	}
147 
148 	if (exclOp != NULL)
149 	{
150 		Datum	   *opdatums;
151 
152 		opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
153 		for (i = 0; i < constraintNKeys; i++)
154 			opdatums[i] = ObjectIdGetDatum(exclOp[i]);
155 		conexclopArray = construct_array(opdatums, constraintNKeys,
156 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
157 	}
158 	else
159 		conexclopArray = NULL;
160 
161 	/* initialize nulls and values */
162 	for (i = 0; i < Natts_pg_constraint; i++)
163 	{
164 		nulls[i] = false;
165 		values[i] = (Datum) NULL;
166 	}
167 
168 	conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
169 								Anum_pg_constraint_oid);
170 	values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
171 	values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
172 	values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
173 	values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
174 	values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
175 	values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
176 	values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
177 	values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
178 	values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
179 	values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
180 	values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
181 	values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
182 	values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
183 	values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
184 	values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
185 	values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
186 	values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
187 	values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
188 
189 	if (conkeyArray)
190 		values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
191 	else
192 		nulls[Anum_pg_constraint_conkey - 1] = true;
193 
194 	if (confkeyArray)
195 		values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
196 	else
197 		nulls[Anum_pg_constraint_confkey - 1] = true;
198 
199 	if (conpfeqopArray)
200 		values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
201 	else
202 		nulls[Anum_pg_constraint_conpfeqop - 1] = true;
203 
204 	if (conppeqopArray)
205 		values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
206 	else
207 		nulls[Anum_pg_constraint_conppeqop - 1] = true;
208 
209 	if (conffeqopArray)
210 		values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
211 	else
212 		nulls[Anum_pg_constraint_conffeqop - 1] = true;
213 
214 	if (conexclopArray)
215 		values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
216 	else
217 		nulls[Anum_pg_constraint_conexclop - 1] = true;
218 
219 	if (conBin)
220 		values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
221 	else
222 		nulls[Anum_pg_constraint_conbin - 1] = true;
223 
224 	tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
225 
226 	CatalogTupleInsert(conDesc, tup);
227 
228 	ObjectAddressSet(conobject, ConstraintRelationId, conOid);
229 
230 	table_close(conDesc, RowExclusiveLock);
231 
232 	/* Handle set of auto dependencies */
233 	addrs_auto = new_object_addresses();
234 
235 	if (OidIsValid(relId))
236 	{
237 		/*
238 		 * Register auto dependency from constraint to owning relation, or to
239 		 * specific column(s) if any are mentioned.
240 		 */
241 		ObjectAddress relobject;
242 
243 		if (constraintNTotalKeys > 0)
244 		{
245 			for (i = 0; i < constraintNTotalKeys; i++)
246 			{
247 				ObjectAddressSubSet(relobject, RelationRelationId, relId,
248 									constraintKey[i]);
249 				add_exact_object_address(&relobject, addrs_auto);
250 			}
251 		}
252 		else
253 		{
254 			ObjectAddressSet(relobject, RelationRelationId, relId);
255 			add_exact_object_address(&relobject, addrs_auto);
256 		}
257 	}
258 
259 	if (OidIsValid(domainId))
260 	{
261 		/*
262 		 * Register auto dependency from constraint to owning domain
263 		 */
264 		ObjectAddress domobject;
265 
266 		ObjectAddressSet(domobject, TypeRelationId, domainId);
267 		add_exact_object_address(&domobject, addrs_auto);
268 	}
269 
270 	record_object_address_dependencies(&conobject, addrs_auto,
271 									   DEPENDENCY_AUTO);
272 	free_object_addresses(addrs_auto);
273 
274 	/* Handle set of normal dependencies */
275 	addrs_normal = new_object_addresses();
276 
277 	if (OidIsValid(foreignRelId))
278 	{
279 		/*
280 		 * Register normal dependency from constraint to foreign relation, or
281 		 * to specific column(s) if any are mentioned.
282 		 */
283 		ObjectAddress relobject;
284 
285 		if (foreignNKeys > 0)
286 		{
287 			for (i = 0; i < foreignNKeys; i++)
288 			{
289 				ObjectAddressSubSet(relobject, RelationRelationId,
290 									foreignRelId, foreignKey[i]);
291 				add_exact_object_address(&relobject, addrs_normal);
292 			}
293 		}
294 		else
295 		{
296 			ObjectAddressSet(relobject, RelationRelationId, foreignRelId);
297 			add_exact_object_address(&relobject, addrs_normal);
298 		}
299 	}
300 
301 	if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
302 	{
303 		/*
304 		 * Register normal dependency on the unique index that supports a
305 		 * foreign-key constraint.  (Note: for indexes associated with unique
306 		 * or primary-key constraints, the dependency runs the other way, and
307 		 * is not made here.)
308 		 */
309 		ObjectAddress relobject;
310 
311 		ObjectAddressSet(relobject, RelationRelationId, indexRelId);
312 		add_exact_object_address(&relobject, addrs_normal);
313 	}
314 
315 	if (foreignNKeys > 0)
316 	{
317 		/*
318 		 * Register normal dependencies on the equality operators that support
319 		 * a foreign-key constraint.  If the PK and FK types are the same then
320 		 * all three operators for a column are the same; otherwise they are
321 		 * different.
322 		 */
323 		ObjectAddress oprobject;
324 
325 		oprobject.classId = OperatorRelationId;
326 		oprobject.objectSubId = 0;
327 
328 		for (i = 0; i < foreignNKeys; i++)
329 		{
330 			oprobject.objectId = pfEqOp[i];
331 			add_exact_object_address(&oprobject, addrs_normal);
332 			if (ppEqOp[i] != pfEqOp[i])
333 			{
334 				oprobject.objectId = ppEqOp[i];
335 				add_exact_object_address(&oprobject, addrs_normal);
336 			}
337 			if (ffEqOp[i] != pfEqOp[i])
338 			{
339 				oprobject.objectId = ffEqOp[i];
340 				add_exact_object_address(&oprobject, addrs_normal);
341 			}
342 		}
343 	}
344 
345 	record_object_address_dependencies(&conobject, addrs_normal,
346 									   DEPENDENCY_NORMAL);
347 	free_object_addresses(addrs_normal);
348 
349 	/*
350 	 * We don't bother to register dependencies on the exclusion operators of
351 	 * an exclusion constraint.  We assume they are members of the opclass
352 	 * supporting the index, so there's an indirect dependency via that. (This
353 	 * would be pretty dicey for cross-type operators, but exclusion operators
354 	 * can never be cross-type.)
355 	 */
356 
357 	if (conExpr != NULL)
358 	{
359 		/*
360 		 * Register dependencies from constraint to objects mentioned in CHECK
361 		 * expression.
362 		 */
363 		recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
364 										DEPENDENCY_NORMAL,
365 										DEPENDENCY_NORMAL, false);
366 	}
367 
368 	/* Post creation hook for new constraint */
369 	InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
370 								  is_internal);
371 
372 	return conOid;
373 }
374 
375 /*
376  * Test whether given name is currently used as a constraint name
377  * for the given object (relation or domain).
378  *
379  * This is used to decide whether to accept a user-specified constraint name.
380  * It is deliberately not the same test as ChooseConstraintName uses to decide
381  * whether an auto-generated name is OK: here, we will allow it unless there
382  * is an identical constraint name in use *on the same object*.
383  *
384  * NB: Caller should hold exclusive lock on the given object, else
385  * this test can be fooled by concurrent additions.
386  */
387 bool
ConstraintNameIsUsed(ConstraintCategory conCat,Oid objId,const char * conname)388 ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
389 					 const char *conname)
390 {
391 	bool		found;
392 	Relation	conDesc;
393 	SysScanDesc conscan;
394 	ScanKeyData skey[3];
395 
396 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
397 
398 	ScanKeyInit(&skey[0],
399 				Anum_pg_constraint_conrelid,
400 				BTEqualStrategyNumber, F_OIDEQ,
401 				ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
402 								 ? objId : InvalidOid));
403 	ScanKeyInit(&skey[1],
404 				Anum_pg_constraint_contypid,
405 				BTEqualStrategyNumber, F_OIDEQ,
406 				ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
407 								 ? objId : InvalidOid));
408 	ScanKeyInit(&skey[2],
409 				Anum_pg_constraint_conname,
410 				BTEqualStrategyNumber, F_NAMEEQ,
411 				CStringGetDatum(conname));
412 
413 	conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
414 								 true, NULL, 3, skey);
415 
416 	/* There can be at most one matching row */
417 	found = (HeapTupleIsValid(systable_getnext(conscan)));
418 
419 	systable_endscan(conscan);
420 	table_close(conDesc, AccessShareLock);
421 
422 	return found;
423 }
424 
425 /*
426  * Does any constraint of the given name exist in the given namespace?
427  *
428  * This is used for code that wants to match ChooseConstraintName's rule
429  * that we should avoid autogenerating duplicate constraint names within a
430  * namespace.
431  */
432 bool
ConstraintNameExists(const char * conname,Oid namespaceid)433 ConstraintNameExists(const char *conname, Oid namespaceid)
434 {
435 	bool		found;
436 	Relation	conDesc;
437 	SysScanDesc conscan;
438 	ScanKeyData skey[2];
439 
440 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
441 
442 	ScanKeyInit(&skey[0],
443 				Anum_pg_constraint_conname,
444 				BTEqualStrategyNumber, F_NAMEEQ,
445 				CStringGetDatum(conname));
446 
447 	ScanKeyInit(&skey[1],
448 				Anum_pg_constraint_connamespace,
449 				BTEqualStrategyNumber, F_OIDEQ,
450 				ObjectIdGetDatum(namespaceid));
451 
452 	conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
453 								 NULL, 2, skey);
454 
455 	found = (HeapTupleIsValid(systable_getnext(conscan)));
456 
457 	systable_endscan(conscan);
458 	table_close(conDesc, AccessShareLock);
459 
460 	return found;
461 }
462 
463 /*
464  * Select a nonconflicting name for a new constraint.
465  *
466  * The objective here is to choose a name that is unique within the
467  * specified namespace.  Postgres does not require this, but the SQL
468  * spec does, and some apps depend on it.  Therefore we avoid choosing
469  * default names that so conflict.
470  *
471  * name1, name2, and label are used the same way as for makeObjectName(),
472  * except that the label can't be NULL; digits will be appended to the label
473  * if needed to create a name that is unique within the specified namespace.
474  *
475  * 'others' can be a list of string names already chosen within the current
476  * command (but not yet reflected into the catalogs); we will not choose
477  * a duplicate of one of these either.
478  *
479  * Note: it is theoretically possible to get a collision anyway, if someone
480  * else chooses the same name concurrently.  This is fairly unlikely to be
481  * a problem in practice, especially if one is holding an exclusive lock on
482  * the relation identified by name1.
483  *
484  * Returns a palloc'd string.
485  */
486 char *
ChooseConstraintName(const char * name1,const char * name2,const char * label,Oid namespaceid,List * others)487 ChooseConstraintName(const char *name1, const char *name2,
488 					 const char *label, Oid namespaceid,
489 					 List *others)
490 {
491 	int			pass = 0;
492 	char	   *conname = NULL;
493 	char		modlabel[NAMEDATALEN];
494 	Relation	conDesc;
495 	SysScanDesc conscan;
496 	ScanKeyData skey[2];
497 	bool		found;
498 	ListCell   *l;
499 
500 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
501 
502 	/* try the unmodified label first */
503 	strlcpy(modlabel, label, sizeof(modlabel));
504 
505 	for (;;)
506 	{
507 		conname = makeObjectName(name1, name2, modlabel);
508 
509 		found = false;
510 
511 		foreach(l, others)
512 		{
513 			if (strcmp((char *) lfirst(l), conname) == 0)
514 			{
515 				found = true;
516 				break;
517 			}
518 		}
519 
520 		if (!found)
521 		{
522 			ScanKeyInit(&skey[0],
523 						Anum_pg_constraint_conname,
524 						BTEqualStrategyNumber, F_NAMEEQ,
525 						CStringGetDatum(conname));
526 
527 			ScanKeyInit(&skey[1],
528 						Anum_pg_constraint_connamespace,
529 						BTEqualStrategyNumber, F_OIDEQ,
530 						ObjectIdGetDatum(namespaceid));
531 
532 			conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
533 										 NULL, 2, skey);
534 
535 			found = (HeapTupleIsValid(systable_getnext(conscan)));
536 
537 			systable_endscan(conscan);
538 		}
539 
540 		if (!found)
541 			break;
542 
543 		/* found a conflict, so try a new name component */
544 		pfree(conname);
545 		snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
546 	}
547 
548 	table_close(conDesc, AccessShareLock);
549 
550 	return conname;
551 }
552 
553 /*
554  * Delete a single constraint record.
555  */
556 void
RemoveConstraintById(Oid conId)557 RemoveConstraintById(Oid conId)
558 {
559 	Relation	conDesc;
560 	HeapTuple	tup;
561 	Form_pg_constraint con;
562 
563 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
564 
565 	tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
566 	if (!HeapTupleIsValid(tup)) /* should not happen */
567 		elog(ERROR, "cache lookup failed for constraint %u", conId);
568 	con = (Form_pg_constraint) GETSTRUCT(tup);
569 
570 	/*
571 	 * Special processing depending on what the constraint is for.
572 	 */
573 	if (OidIsValid(con->conrelid))
574 	{
575 		Relation	rel;
576 
577 		/*
578 		 * If the constraint is for a relation, open and exclusive-lock the
579 		 * relation it's for.
580 		 */
581 		rel = table_open(con->conrelid, AccessExclusiveLock);
582 
583 		/*
584 		 * We need to update the relchecks count if it is a check constraint
585 		 * being dropped.  This update will force backends to rebuild relcache
586 		 * entries when we commit.
587 		 */
588 		if (con->contype == CONSTRAINT_CHECK)
589 		{
590 			Relation	pgrel;
591 			HeapTuple	relTup;
592 			Form_pg_class classForm;
593 
594 			pgrel = table_open(RelationRelationId, RowExclusiveLock);
595 			relTup = SearchSysCacheCopy1(RELOID,
596 										 ObjectIdGetDatum(con->conrelid));
597 			if (!HeapTupleIsValid(relTup))
598 				elog(ERROR, "cache lookup failed for relation %u",
599 					 con->conrelid);
600 			classForm = (Form_pg_class) GETSTRUCT(relTup);
601 
602 			if (classForm->relchecks == 0)	/* should not happen */
603 				elog(ERROR, "relation \"%s\" has relchecks = 0",
604 					 RelationGetRelationName(rel));
605 			classForm->relchecks--;
606 
607 			CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
608 
609 			heap_freetuple(relTup);
610 
611 			table_close(pgrel, RowExclusiveLock);
612 		}
613 
614 		/* Keep lock on constraint's rel until end of xact */
615 		table_close(rel, NoLock);
616 	}
617 	else if (OidIsValid(con->contypid))
618 	{
619 		/*
620 		 * XXX for now, do nothing special when dropping a domain constraint
621 		 *
622 		 * Probably there should be some form of locking on the domain type,
623 		 * but we have no such concept at the moment.
624 		 */
625 	}
626 	else
627 		elog(ERROR, "constraint %u is not of a known type", conId);
628 
629 	/* Fry the constraint itself */
630 	CatalogTupleDelete(conDesc, &tup->t_self);
631 
632 	/* Clean up */
633 	ReleaseSysCache(tup);
634 	table_close(conDesc, RowExclusiveLock);
635 }
636 
637 /*
638  * RenameConstraintById
639  *		Rename a constraint.
640  *
641  * Note: this isn't intended to be a user-exposed function; it doesn't check
642  * permissions etc.  Currently this is only invoked when renaming an index
643  * that is associated with a constraint, but it's made a little more general
644  * than that with the expectation of someday having ALTER TABLE RENAME
645  * CONSTRAINT.
646  */
647 void
RenameConstraintById(Oid conId,const char * newname)648 RenameConstraintById(Oid conId, const char *newname)
649 {
650 	Relation	conDesc;
651 	HeapTuple	tuple;
652 	Form_pg_constraint con;
653 
654 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
655 
656 	tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
657 	if (!HeapTupleIsValid(tuple))
658 		elog(ERROR, "cache lookup failed for constraint %u", conId);
659 	con = (Form_pg_constraint) GETSTRUCT(tuple);
660 
661 	/*
662 	 * For user-friendliness, check whether the name is already in use.
663 	 */
664 	if (OidIsValid(con->conrelid) &&
665 		ConstraintNameIsUsed(CONSTRAINT_RELATION,
666 							 con->conrelid,
667 							 newname))
668 		ereport(ERROR,
669 				(errcode(ERRCODE_DUPLICATE_OBJECT),
670 				 errmsg("constraint \"%s\" for relation \"%s\" already exists",
671 						newname, get_rel_name(con->conrelid))));
672 	if (OidIsValid(con->contypid) &&
673 		ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
674 							 con->contypid,
675 							 newname))
676 		ereport(ERROR,
677 				(errcode(ERRCODE_DUPLICATE_OBJECT),
678 				 errmsg("constraint \"%s\" for domain %s already exists",
679 						newname, format_type_be(con->contypid))));
680 
681 	/* OK, do the rename --- tuple is a copy, so OK to scribble on it */
682 	namestrcpy(&(con->conname), newname);
683 
684 	CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
685 
686 	InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
687 
688 	heap_freetuple(tuple);
689 	table_close(conDesc, RowExclusiveLock);
690 }
691 
692 /*
693  * AlterConstraintNamespaces
694  *		Find any constraints belonging to the specified object,
695  *		and move them to the specified new namespace.
696  *
697  * isType indicates whether the owning object is a type or a relation.
698  */
699 void
AlterConstraintNamespaces(Oid ownerId,Oid oldNspId,Oid newNspId,bool isType,ObjectAddresses * objsMoved)700 AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
701 						  Oid newNspId, bool isType, ObjectAddresses *objsMoved)
702 {
703 	Relation	conRel;
704 	ScanKeyData key[2];
705 	SysScanDesc scan;
706 	HeapTuple	tup;
707 
708 	conRel = table_open(ConstraintRelationId, RowExclusiveLock);
709 
710 	ScanKeyInit(&key[0],
711 				Anum_pg_constraint_conrelid,
712 				BTEqualStrategyNumber, F_OIDEQ,
713 				ObjectIdGetDatum(isType ? InvalidOid : ownerId));
714 	ScanKeyInit(&key[1],
715 				Anum_pg_constraint_contypid,
716 				BTEqualStrategyNumber, F_OIDEQ,
717 				ObjectIdGetDatum(isType ? ownerId : InvalidOid));
718 
719 	scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
720 							  NULL, 2, key);
721 
722 	while (HeapTupleIsValid((tup = systable_getnext(scan))))
723 	{
724 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
725 		ObjectAddress thisobj;
726 
727 		ObjectAddressSet(thisobj, ConstraintRelationId, conform->oid);
728 
729 		if (object_address_present(&thisobj, objsMoved))
730 			continue;
731 
732 		/* Don't update if the object is already part of the namespace */
733 		if (conform->connamespace == oldNspId && oldNspId != newNspId)
734 		{
735 			tup = heap_copytuple(tup);
736 			conform = (Form_pg_constraint) GETSTRUCT(tup);
737 
738 			conform->connamespace = newNspId;
739 
740 			CatalogTupleUpdate(conRel, &tup->t_self, tup);
741 
742 			/*
743 			 * Note: currently, the constraint will not have its own
744 			 * dependency on the namespace, so we don't need to do
745 			 * changeDependencyFor().
746 			 */
747 		}
748 
749 		InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
750 
751 		add_exact_object_address(&thisobj, objsMoved);
752 	}
753 
754 	systable_endscan(scan);
755 
756 	table_close(conRel, RowExclusiveLock);
757 }
758 
759 /*
760  * ConstraintSetParentConstraint
761  *		Set a partition's constraint as child of its parent constraint,
762  *		or remove the linkage if parentConstrId is InvalidOid.
763  *
764  * This updates the constraint's pg_constraint row to show it as inherited, and
765  * adds PARTITION dependencies to prevent the constraint from being deleted
766  * on its own.  Alternatively, reverse that.
767  */
768 void
ConstraintSetParentConstraint(Oid childConstrId,Oid parentConstrId,Oid childTableId)769 ConstraintSetParentConstraint(Oid childConstrId,
770 							  Oid parentConstrId,
771 							  Oid childTableId)
772 {
773 	Relation	constrRel;
774 	Form_pg_constraint constrForm;
775 	HeapTuple	tuple,
776 				newtup;
777 	ObjectAddress depender;
778 	ObjectAddress referenced;
779 
780 	constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
781 	tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
782 	if (!HeapTupleIsValid(tuple))
783 		elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
784 	newtup = heap_copytuple(tuple);
785 	constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
786 	if (OidIsValid(parentConstrId))
787 	{
788 		/* don't allow setting parent for a constraint that already has one */
789 		Assert(constrForm->coninhcount == 0);
790 		if (constrForm->conparentid != InvalidOid)
791 			elog(ERROR, "constraint %u already has a parent constraint",
792 				 childConstrId);
793 
794 		constrForm->conislocal = false;
795 		constrForm->coninhcount++;
796 		constrForm->conparentid = parentConstrId;
797 
798 		CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
799 
800 		ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
801 
802 		ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
803 		recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
804 
805 		ObjectAddressSet(referenced, RelationRelationId, childTableId);
806 		recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
807 	}
808 	else
809 	{
810 		constrForm->coninhcount--;
811 		constrForm->conislocal = true;
812 		constrForm->conparentid = InvalidOid;
813 
814 		/* Make sure there's no further inheritance. */
815 		Assert(constrForm->coninhcount == 0);
816 
817 		CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
818 
819 		deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
820 										ConstraintRelationId,
821 										DEPENDENCY_PARTITION_PRI);
822 		deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
823 										RelationRelationId,
824 										DEPENDENCY_PARTITION_SEC);
825 	}
826 
827 	ReleaseSysCache(tuple);
828 	table_close(constrRel, RowExclusiveLock);
829 }
830 
831 
832 /*
833  * get_relation_constraint_oid
834  *		Find a constraint on the specified relation with the specified name.
835  *		Returns constraint's OID.
836  */
837 Oid
get_relation_constraint_oid(Oid relid,const char * conname,bool missing_ok)838 get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
839 {
840 	Relation	pg_constraint;
841 	HeapTuple	tuple;
842 	SysScanDesc scan;
843 	ScanKeyData skey[3];
844 	Oid			conOid = InvalidOid;
845 
846 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
847 
848 	ScanKeyInit(&skey[0],
849 				Anum_pg_constraint_conrelid,
850 				BTEqualStrategyNumber, F_OIDEQ,
851 				ObjectIdGetDatum(relid));
852 	ScanKeyInit(&skey[1],
853 				Anum_pg_constraint_contypid,
854 				BTEqualStrategyNumber, F_OIDEQ,
855 				ObjectIdGetDatum(InvalidOid));
856 	ScanKeyInit(&skey[2],
857 				Anum_pg_constraint_conname,
858 				BTEqualStrategyNumber, F_NAMEEQ,
859 				CStringGetDatum(conname));
860 
861 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
862 							  NULL, 3, skey);
863 
864 	/* There can be at most one matching row */
865 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
866 		conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
867 
868 	systable_endscan(scan);
869 
870 	/* If no such constraint exists, complain */
871 	if (!OidIsValid(conOid) && !missing_ok)
872 		ereport(ERROR,
873 				(errcode(ERRCODE_UNDEFINED_OBJECT),
874 				 errmsg("constraint \"%s\" for table \"%s\" does not exist",
875 						conname, get_rel_name(relid))));
876 
877 	table_close(pg_constraint, AccessShareLock);
878 
879 	return conOid;
880 }
881 
882 /*
883  * get_relation_constraint_attnos
884  *		Find a constraint on the specified relation with the specified name
885  *		and return the constrained columns.
886  *
887  * Returns a Bitmapset of the column attnos of the constrained columns, with
888  * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
889  * columns can be represented.
890  *
891  * *constraintOid is set to the OID of the constraint, or InvalidOid on
892  * failure.
893  */
894 Bitmapset *
get_relation_constraint_attnos(Oid relid,const char * conname,bool missing_ok,Oid * constraintOid)895 get_relation_constraint_attnos(Oid relid, const char *conname,
896 							   bool missing_ok, Oid *constraintOid)
897 {
898 	Bitmapset  *conattnos = NULL;
899 	Relation	pg_constraint;
900 	HeapTuple	tuple;
901 	SysScanDesc scan;
902 	ScanKeyData skey[3];
903 
904 	/* Set *constraintOid, to avoid complaints about uninitialized vars */
905 	*constraintOid = InvalidOid;
906 
907 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
908 
909 	ScanKeyInit(&skey[0],
910 				Anum_pg_constraint_conrelid,
911 				BTEqualStrategyNumber, F_OIDEQ,
912 				ObjectIdGetDatum(relid));
913 	ScanKeyInit(&skey[1],
914 				Anum_pg_constraint_contypid,
915 				BTEqualStrategyNumber, F_OIDEQ,
916 				ObjectIdGetDatum(InvalidOid));
917 	ScanKeyInit(&skey[2],
918 				Anum_pg_constraint_conname,
919 				BTEqualStrategyNumber, F_NAMEEQ,
920 				CStringGetDatum(conname));
921 
922 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
923 							  NULL, 3, skey);
924 
925 	/* There can be at most one matching row */
926 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
927 	{
928 		Datum		adatum;
929 		bool		isNull;
930 
931 		*constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
932 
933 		/* Extract the conkey array, ie, attnums of constrained columns */
934 		adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
935 							  RelationGetDescr(pg_constraint), &isNull);
936 		if (!isNull)
937 		{
938 			ArrayType  *arr;
939 			int			numcols;
940 			int16	   *attnums;
941 			int			i;
942 
943 			arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
944 			numcols = ARR_DIMS(arr)[0];
945 			if (ARR_NDIM(arr) != 1 ||
946 				numcols < 0 ||
947 				ARR_HASNULL(arr) ||
948 				ARR_ELEMTYPE(arr) != INT2OID)
949 				elog(ERROR, "conkey is not a 1-D smallint array");
950 			attnums = (int16 *) ARR_DATA_PTR(arr);
951 
952 			/* Construct the result value */
953 			for (i = 0; i < numcols; i++)
954 			{
955 				conattnos = bms_add_member(conattnos,
956 										   attnums[i] - FirstLowInvalidHeapAttributeNumber);
957 			}
958 		}
959 	}
960 
961 	systable_endscan(scan);
962 
963 	/* If no such constraint exists, complain */
964 	if (!OidIsValid(*constraintOid) && !missing_ok)
965 		ereport(ERROR,
966 				(errcode(ERRCODE_UNDEFINED_OBJECT),
967 				 errmsg("constraint \"%s\" for table \"%s\" does not exist",
968 						conname, get_rel_name(relid))));
969 
970 	table_close(pg_constraint, AccessShareLock);
971 
972 	return conattnos;
973 }
974 
975 /*
976  * Return the OID of the constraint associated with the given index in the
977  * given relation; or InvalidOid if no such index is catalogued.
978  */
979 Oid
get_relation_idx_constraint_oid(Oid relationId,Oid indexId)980 get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
981 {
982 	Relation	pg_constraint;
983 	SysScanDesc scan;
984 	ScanKeyData key;
985 	HeapTuple	tuple;
986 	Oid			constraintId = InvalidOid;
987 
988 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
989 
990 	ScanKeyInit(&key,
991 				Anum_pg_constraint_conrelid,
992 				BTEqualStrategyNumber,
993 				F_OIDEQ,
994 				ObjectIdGetDatum(relationId));
995 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
996 							  true, NULL, 1, &key);
997 	while ((tuple = systable_getnext(scan)) != NULL)
998 	{
999 		Form_pg_constraint constrForm;
1000 
1001 		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
1002 		if (constrForm->conindid == indexId)
1003 		{
1004 			constraintId = constrForm->oid;
1005 			break;
1006 		}
1007 	}
1008 	systable_endscan(scan);
1009 
1010 	table_close(pg_constraint, AccessShareLock);
1011 	return constraintId;
1012 }
1013 
1014 /*
1015  * get_domain_constraint_oid
1016  *		Find a constraint on the specified domain with the specified name.
1017  *		Returns constraint's OID.
1018  */
1019 Oid
get_domain_constraint_oid(Oid typid,const char * conname,bool missing_ok)1020 get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1021 {
1022 	Relation	pg_constraint;
1023 	HeapTuple	tuple;
1024 	SysScanDesc scan;
1025 	ScanKeyData skey[3];
1026 	Oid			conOid = InvalidOid;
1027 
1028 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1029 
1030 	ScanKeyInit(&skey[0],
1031 				Anum_pg_constraint_conrelid,
1032 				BTEqualStrategyNumber, F_OIDEQ,
1033 				ObjectIdGetDatum(InvalidOid));
1034 	ScanKeyInit(&skey[1],
1035 				Anum_pg_constraint_contypid,
1036 				BTEqualStrategyNumber, F_OIDEQ,
1037 				ObjectIdGetDatum(typid));
1038 	ScanKeyInit(&skey[2],
1039 				Anum_pg_constraint_conname,
1040 				BTEqualStrategyNumber, F_NAMEEQ,
1041 				CStringGetDatum(conname));
1042 
1043 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1044 							  NULL, 3, skey);
1045 
1046 	/* There can be at most one matching row */
1047 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1048 		conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1049 
1050 	systable_endscan(scan);
1051 
1052 	/* If no such constraint exists, complain */
1053 	if (!OidIsValid(conOid) && !missing_ok)
1054 		ereport(ERROR,
1055 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1056 				 errmsg("constraint \"%s\" for domain %s does not exist",
1057 						conname, format_type_be(typid))));
1058 
1059 	table_close(pg_constraint, AccessShareLock);
1060 
1061 	return conOid;
1062 }
1063 
1064 /*
1065  * get_primary_key_attnos
1066  *		Identify the columns in a relation's primary key, if any.
1067  *
1068  * Returns a Bitmapset of the column attnos of the primary key's columns,
1069  * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1070  * system columns can be represented.
1071  *
1072  * If there is no primary key, return NULL.  We also return NULL if the pkey
1073  * constraint is deferrable and deferrableOk is false.
1074  *
1075  * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1076  * on failure.
1077  */
1078 Bitmapset *
get_primary_key_attnos(Oid relid,bool deferrableOk,Oid * constraintOid)1079 get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1080 {
1081 	Bitmapset  *pkattnos = NULL;
1082 	Relation	pg_constraint;
1083 	HeapTuple	tuple;
1084 	SysScanDesc scan;
1085 	ScanKeyData skey[1];
1086 
1087 	/* Set *constraintOid, to avoid complaints about uninitialized vars */
1088 	*constraintOid = InvalidOid;
1089 
1090 	/* Scan pg_constraint for constraints of the target rel */
1091 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1092 
1093 	ScanKeyInit(&skey[0],
1094 				Anum_pg_constraint_conrelid,
1095 				BTEqualStrategyNumber, F_OIDEQ,
1096 				ObjectIdGetDatum(relid));
1097 
1098 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1099 							  NULL, 1, skey);
1100 
1101 	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1102 	{
1103 		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1104 		Datum		adatum;
1105 		bool		isNull;
1106 		ArrayType  *arr;
1107 		int16	   *attnums;
1108 		int			numkeys;
1109 		int			i;
1110 
1111 		/* Skip constraints that are not PRIMARY KEYs */
1112 		if (con->contype != CONSTRAINT_PRIMARY)
1113 			continue;
1114 
1115 		/*
1116 		 * If the primary key is deferrable, but we've been instructed to
1117 		 * ignore deferrable constraints, then we might as well give up
1118 		 * searching, since there can only be a single primary key on a table.
1119 		 */
1120 		if (con->condeferrable && !deferrableOk)
1121 			break;
1122 
1123 		/* Extract the conkey array, ie, attnums of PK's columns */
1124 		adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1125 							  RelationGetDescr(pg_constraint), &isNull);
1126 		if (isNull)
1127 			elog(ERROR, "null conkey for constraint %u",
1128 				 ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1129 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1130 		numkeys = ARR_DIMS(arr)[0];
1131 		if (ARR_NDIM(arr) != 1 ||
1132 			numkeys < 0 ||
1133 			ARR_HASNULL(arr) ||
1134 			ARR_ELEMTYPE(arr) != INT2OID)
1135 			elog(ERROR, "conkey is not a 1-D smallint array");
1136 		attnums = (int16 *) ARR_DATA_PTR(arr);
1137 
1138 		/* Construct the result value */
1139 		for (i = 0; i < numkeys; i++)
1140 		{
1141 			pkattnos = bms_add_member(pkattnos,
1142 									  attnums[i] - FirstLowInvalidHeapAttributeNumber);
1143 		}
1144 		*constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1145 
1146 		/* No need to search further */
1147 		break;
1148 	}
1149 
1150 	systable_endscan(scan);
1151 
1152 	table_close(pg_constraint, AccessShareLock);
1153 
1154 	return pkattnos;
1155 }
1156 
1157 /*
1158  * Extract data from the pg_constraint tuple of a foreign-key constraint.
1159  *
1160  * All arguments save the first are output arguments; the last three of them
1161  * can be passed as NULL if caller doesn't need them.
1162  */
1163 void
DeconstructFkConstraintRow(HeapTuple tuple,int * numfks,AttrNumber * conkey,AttrNumber * confkey,Oid * pf_eq_oprs,Oid * pp_eq_oprs,Oid * ff_eq_oprs)1164 DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1165 						   AttrNumber *conkey, AttrNumber *confkey,
1166 						   Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
1167 {
1168 	Oid			constrId;
1169 	Datum		adatum;
1170 	bool		isNull;
1171 	ArrayType  *arr;
1172 	int			numkeys;
1173 
1174 	constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1175 
1176 	/*
1177 	 * We expect the arrays to be 1-D arrays of the right types; verify that.
1178 	 * We don't need to use deconstruct_array() since the array data is just
1179 	 * going to look like a C array of values.
1180 	 */
1181 	adatum = SysCacheGetAttr(CONSTROID, tuple,
1182 							 Anum_pg_constraint_conkey, &isNull);
1183 	if (isNull)
1184 		elog(ERROR, "null conkey for constraint %u", constrId);
1185 	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1186 	if (ARR_NDIM(arr) != 1 ||
1187 		ARR_HASNULL(arr) ||
1188 		ARR_ELEMTYPE(arr) != INT2OID)
1189 		elog(ERROR, "conkey is not a 1-D smallint array");
1190 	numkeys = ARR_DIMS(arr)[0];
1191 	if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1192 		elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1193 	memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1194 	if ((Pointer) arr != DatumGetPointer(adatum))
1195 		pfree(arr);				/* free de-toasted copy, if any */
1196 
1197 	adatum = SysCacheGetAttr(CONSTROID, tuple,
1198 							 Anum_pg_constraint_confkey, &isNull);
1199 	if (isNull)
1200 		elog(ERROR, "null confkey for constraint %u", constrId);
1201 	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1202 	if (ARR_NDIM(arr) != 1 ||
1203 		ARR_DIMS(arr)[0] != numkeys ||
1204 		ARR_HASNULL(arr) ||
1205 		ARR_ELEMTYPE(arr) != INT2OID)
1206 		elog(ERROR, "confkey is not a 1-D smallint array");
1207 	memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1208 	if ((Pointer) arr != DatumGetPointer(adatum))
1209 		pfree(arr);				/* free de-toasted copy, if any */
1210 
1211 	if (pf_eq_oprs)
1212 	{
1213 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1214 								 Anum_pg_constraint_conpfeqop, &isNull);
1215 		if (isNull)
1216 			elog(ERROR, "null conpfeqop for constraint %u", constrId);
1217 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1218 		/* see TryReuseForeignKey if you change the test below */
1219 		if (ARR_NDIM(arr) != 1 ||
1220 			ARR_DIMS(arr)[0] != numkeys ||
1221 			ARR_HASNULL(arr) ||
1222 			ARR_ELEMTYPE(arr) != OIDOID)
1223 			elog(ERROR, "conpfeqop is not a 1-D Oid array");
1224 		memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1225 		if ((Pointer) arr != DatumGetPointer(adatum))
1226 			pfree(arr);			/* free de-toasted copy, if any */
1227 	}
1228 
1229 	if (pp_eq_oprs)
1230 	{
1231 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1232 								 Anum_pg_constraint_conppeqop, &isNull);
1233 		if (isNull)
1234 			elog(ERROR, "null conppeqop for constraint %u", constrId);
1235 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1236 		if (ARR_NDIM(arr) != 1 ||
1237 			ARR_DIMS(arr)[0] != numkeys ||
1238 			ARR_HASNULL(arr) ||
1239 			ARR_ELEMTYPE(arr) != OIDOID)
1240 			elog(ERROR, "conppeqop is not a 1-D Oid array");
1241 		memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1242 		if ((Pointer) arr != DatumGetPointer(adatum))
1243 			pfree(arr);			/* free de-toasted copy, if any */
1244 	}
1245 
1246 	if (ff_eq_oprs)
1247 	{
1248 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1249 								 Anum_pg_constraint_conffeqop, &isNull);
1250 		if (isNull)
1251 			elog(ERROR, "null conffeqop for constraint %u", constrId);
1252 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1253 		if (ARR_NDIM(arr) != 1 ||
1254 			ARR_DIMS(arr)[0] != numkeys ||
1255 			ARR_HASNULL(arr) ||
1256 			ARR_ELEMTYPE(arr) != OIDOID)
1257 			elog(ERROR, "conffeqop is not a 1-D Oid array");
1258 		memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1259 		if ((Pointer) arr != DatumGetPointer(adatum))
1260 			pfree(arr);			/* free de-toasted copy, if any */
1261 	}
1262 
1263 	*numfks = numkeys;
1264 }
1265 
1266 /*
1267  * Determine whether a relation can be proven functionally dependent on
1268  * a set of grouping columns.  If so, return true and add the pg_constraint
1269  * OIDs of the constraints needed for the proof to the *constraintDeps list.
1270  *
1271  * grouping_columns is a list of grouping expressions, in which columns of
1272  * the rel of interest are Vars with the indicated varno/varlevelsup.
1273  *
1274  * Currently we only check to see if the rel has a primary key that is a
1275  * subset of the grouping_columns.  We could also use plain unique constraints
1276  * if all their columns are known not null, but there's a problem: we need
1277  * to be able to represent the not-null-ness as part of the constraints added
1278  * to *constraintDeps.  FIXME whenever not-null constraints get represented
1279  * in pg_constraint.
1280  */
1281 bool
check_functional_grouping(Oid relid,Index varno,Index varlevelsup,List * grouping_columns,List ** constraintDeps)1282 check_functional_grouping(Oid relid,
1283 						  Index varno, Index varlevelsup,
1284 						  List *grouping_columns,
1285 						  List **constraintDeps)
1286 {
1287 	Bitmapset  *pkattnos;
1288 	Bitmapset  *groupbyattnos;
1289 	Oid			constraintOid;
1290 	ListCell   *gl;
1291 
1292 	/* If the rel has no PK, then we can't prove functional dependency */
1293 	pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1294 	if (pkattnos == NULL)
1295 		return false;
1296 
1297 	/* Identify all the rel's columns that appear in grouping_columns */
1298 	groupbyattnos = NULL;
1299 	foreach(gl, grouping_columns)
1300 	{
1301 		Var		   *gvar = (Var *) lfirst(gl);
1302 
1303 		if (IsA(gvar, Var) &&
1304 			gvar->varno == varno &&
1305 			gvar->varlevelsup == varlevelsup)
1306 			groupbyattnos = bms_add_member(groupbyattnos,
1307 										   gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1308 	}
1309 
1310 	if (bms_is_subset(pkattnos, groupbyattnos))
1311 	{
1312 		/* The PK is a subset of grouping_columns, so we win */
1313 		*constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1314 		return true;
1315 	}
1316 
1317 	return false;
1318 }
1319