1 /*-------------------------------------------------------------------------
2  *
3  * pg_constraint.c
4  *	  routines to support manipulation of the pg_constraint relation
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/catalog/pg_constraint.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/sysattr.h"
20 #include "access/table.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/objectaccess.h"
26 #include "catalog/pg_constraint.h"
27 #include "catalog/pg_operator.h"
28 #include "catalog/pg_type.h"
29 #include "commands/defrem.h"
30 #include "commands/tablecmds.h"
31 #include "utils/array.h"
32 #include "utils/builtins.h"
33 #include "utils/fmgroids.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37 
38 
39 /*
40  * CreateConstraintEntry
41  *	Create a constraint table entry.
42  *
43  * Subsidiary records (such as triggers or indexes to implement the
44  * constraint) are *not* created here.  But we do make dependency links
45  * from the constraint to the things it depends on.
46  *
47  * The new constraint's OID is returned.
48  */
49 Oid
CreateConstraintEntry(const char * constraintName,Oid constraintNamespace,char constraintType,bool isDeferrable,bool isDeferred,bool isValidated,Oid parentConstrId,Oid relId,const int16 * constraintKey,int constraintNKeys,int constraintNTotalKeys,Oid domainId,Oid indexRelId,Oid foreignRelId,const int16 * foreignKey,const Oid * pfEqOp,const Oid * ppEqOp,const Oid * ffEqOp,int foreignNKeys,char foreignUpdateType,char foreignDeleteType,char foreignMatchType,const Oid * exclOp,Node * conExpr,const char * conBin,bool conIsLocal,int conInhCount,bool conNoInherit,bool is_internal)50 CreateConstraintEntry(const char *constraintName,
51 					  Oid constraintNamespace,
52 					  char constraintType,
53 					  bool isDeferrable,
54 					  bool isDeferred,
55 					  bool isValidated,
56 					  Oid parentConstrId,
57 					  Oid relId,
58 					  const int16 *constraintKey,
59 					  int constraintNKeys,
60 					  int constraintNTotalKeys,
61 					  Oid domainId,
62 					  Oid indexRelId,
63 					  Oid foreignRelId,
64 					  const int16 *foreignKey,
65 					  const Oid *pfEqOp,
66 					  const Oid *ppEqOp,
67 					  const Oid *ffEqOp,
68 					  int foreignNKeys,
69 					  char foreignUpdateType,
70 					  char foreignDeleteType,
71 					  char foreignMatchType,
72 					  const Oid *exclOp,
73 					  Node *conExpr,
74 					  const char *conBin,
75 					  bool conIsLocal,
76 					  int conInhCount,
77 					  bool conNoInherit,
78 					  bool is_internal)
79 {
80 	Relation	conDesc;
81 	Oid			conOid;
82 	HeapTuple	tup;
83 	bool		nulls[Natts_pg_constraint];
84 	Datum		values[Natts_pg_constraint];
85 	ArrayType  *conkeyArray;
86 	ArrayType  *confkeyArray;
87 	ArrayType  *conpfeqopArray;
88 	ArrayType  *conppeqopArray;
89 	ArrayType  *conffeqopArray;
90 	ArrayType  *conexclopArray;
91 	NameData	cname;
92 	int			i;
93 	ObjectAddress conobject;
94 
95 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
96 
97 	Assert(constraintName);
98 	namestrcpy(&cname, constraintName);
99 
100 	/*
101 	 * Convert C arrays into Postgres arrays.
102 	 */
103 	if (constraintNKeys > 0)
104 	{
105 		Datum	   *conkey;
106 
107 		conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
108 		for (i = 0; i < constraintNKeys; i++)
109 			conkey[i] = Int16GetDatum(constraintKey[i]);
110 		conkeyArray = construct_array(conkey, constraintNKeys,
111 									  INT2OID, 2, true, TYPALIGN_SHORT);
112 	}
113 	else
114 		conkeyArray = NULL;
115 
116 	if (foreignNKeys > 0)
117 	{
118 		Datum	   *fkdatums;
119 
120 		fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
121 		for (i = 0; i < foreignNKeys; i++)
122 			fkdatums[i] = Int16GetDatum(foreignKey[i]);
123 		confkeyArray = construct_array(fkdatums, foreignNKeys,
124 									   INT2OID, 2, true, TYPALIGN_SHORT);
125 		for (i = 0; i < foreignNKeys; i++)
126 			fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
127 		conpfeqopArray = construct_array(fkdatums, foreignNKeys,
128 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
129 		for (i = 0; i < foreignNKeys; i++)
130 			fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
131 		conppeqopArray = construct_array(fkdatums, foreignNKeys,
132 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
133 		for (i = 0; i < foreignNKeys; i++)
134 			fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
135 		conffeqopArray = construct_array(fkdatums, foreignNKeys,
136 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
137 	}
138 	else
139 	{
140 		confkeyArray = NULL;
141 		conpfeqopArray = NULL;
142 		conppeqopArray = NULL;
143 		conffeqopArray = NULL;
144 	}
145 
146 	if (exclOp != NULL)
147 	{
148 		Datum	   *opdatums;
149 
150 		opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
151 		for (i = 0; i < constraintNKeys; i++)
152 			opdatums[i] = ObjectIdGetDatum(exclOp[i]);
153 		conexclopArray = construct_array(opdatums, constraintNKeys,
154 										 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
155 	}
156 	else
157 		conexclopArray = NULL;
158 
159 	/* initialize nulls and values */
160 	for (i = 0; i < Natts_pg_constraint; i++)
161 	{
162 		nulls[i] = false;
163 		values[i] = (Datum) NULL;
164 	}
165 
166 	conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
167 								Anum_pg_constraint_oid);
168 	values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
169 	values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
170 	values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
171 	values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
172 	values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
173 	values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
174 	values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
175 	values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
176 	values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
177 	values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
178 	values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
179 	values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
180 	values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
181 	values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
182 	values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
183 	values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
184 	values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
185 	values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
186 
187 	if (conkeyArray)
188 		values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
189 	else
190 		nulls[Anum_pg_constraint_conkey - 1] = true;
191 
192 	if (confkeyArray)
193 		values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
194 	else
195 		nulls[Anum_pg_constraint_confkey - 1] = true;
196 
197 	if (conpfeqopArray)
198 		values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
199 	else
200 		nulls[Anum_pg_constraint_conpfeqop - 1] = true;
201 
202 	if (conppeqopArray)
203 		values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
204 	else
205 		nulls[Anum_pg_constraint_conppeqop - 1] = true;
206 
207 	if (conffeqopArray)
208 		values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
209 	else
210 		nulls[Anum_pg_constraint_conffeqop - 1] = true;
211 
212 	if (conexclopArray)
213 		values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
214 	else
215 		nulls[Anum_pg_constraint_conexclop - 1] = true;
216 
217 	if (conBin)
218 		values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
219 	else
220 		nulls[Anum_pg_constraint_conbin - 1] = true;
221 
222 	tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
223 
224 	CatalogTupleInsert(conDesc, tup);
225 
226 	conobject.classId = ConstraintRelationId;
227 	conobject.objectId = conOid;
228 	conobject.objectSubId = 0;
229 
230 	table_close(conDesc, RowExclusiveLock);
231 
232 	if (OidIsValid(relId))
233 	{
234 		/*
235 		 * Register auto dependency from constraint to owning relation, or to
236 		 * specific column(s) if any are mentioned.
237 		 */
238 		ObjectAddress relobject;
239 
240 		relobject.classId = RelationRelationId;
241 		relobject.objectId = relId;
242 		if (constraintNTotalKeys > 0)
243 		{
244 			for (i = 0; i < constraintNTotalKeys; i++)
245 			{
246 				relobject.objectSubId = constraintKey[i];
247 
248 				recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
249 			}
250 		}
251 		else
252 		{
253 			relobject.objectSubId = 0;
254 
255 			recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
256 		}
257 	}
258 
259 	if (OidIsValid(domainId))
260 	{
261 		/*
262 		 * Register auto dependency from constraint to owning domain
263 		 */
264 		ObjectAddress domobject;
265 
266 		domobject.classId = TypeRelationId;
267 		domobject.objectId = domainId;
268 		domobject.objectSubId = 0;
269 
270 		recordDependencyOn(&conobject, &domobject, DEPENDENCY_AUTO);
271 	}
272 
273 	if (OidIsValid(foreignRelId))
274 	{
275 		/*
276 		 * Register normal dependency from constraint to foreign relation, or
277 		 * to specific column(s) if any are mentioned.
278 		 */
279 		ObjectAddress relobject;
280 
281 		relobject.classId = RelationRelationId;
282 		relobject.objectId = foreignRelId;
283 		if (foreignNKeys > 0)
284 		{
285 			for (i = 0; i < foreignNKeys; i++)
286 			{
287 				relobject.objectSubId = foreignKey[i];
288 
289 				recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
290 			}
291 		}
292 		else
293 		{
294 			relobject.objectSubId = 0;
295 
296 			recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
297 		}
298 	}
299 
300 	if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
301 	{
302 		/*
303 		 * Register normal dependency on the unique index that supports a
304 		 * foreign-key constraint.  (Note: for indexes associated with unique
305 		 * or primary-key constraints, the dependency runs the other way, and
306 		 * is not made here.)
307 		 */
308 		ObjectAddress relobject;
309 
310 		relobject.classId = RelationRelationId;
311 		relobject.objectId = indexRelId;
312 		relobject.objectSubId = 0;
313 
314 		recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
315 	}
316 
317 	if (foreignNKeys > 0)
318 	{
319 		/*
320 		 * Register normal dependencies on the equality operators that support
321 		 * a foreign-key constraint.  If the PK and FK types are the same then
322 		 * all three operators for a column are the same; otherwise they are
323 		 * different.
324 		 */
325 		ObjectAddress oprobject;
326 
327 		oprobject.classId = OperatorRelationId;
328 		oprobject.objectSubId = 0;
329 
330 		for (i = 0; i < foreignNKeys; i++)
331 		{
332 			oprobject.objectId = pfEqOp[i];
333 			recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
334 			if (ppEqOp[i] != pfEqOp[i])
335 			{
336 				oprobject.objectId = ppEqOp[i];
337 				recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
338 			}
339 			if (ffEqOp[i] != pfEqOp[i])
340 			{
341 				oprobject.objectId = ffEqOp[i];
342 				recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
343 			}
344 		}
345 	}
346 
347 	/*
348 	 * We don't bother to register dependencies on the exclusion operators of
349 	 * an exclusion constraint.  We assume they are members of the opclass
350 	 * supporting the index, so there's an indirect dependency via that. (This
351 	 * would be pretty dicey for cross-type operators, but exclusion operators
352 	 * can never be cross-type.)
353 	 */
354 
355 	if (conExpr != NULL)
356 	{
357 		/*
358 		 * Register dependencies from constraint to objects mentioned in CHECK
359 		 * expression.
360 		 */
361 		recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
362 										DEPENDENCY_NORMAL,
363 										DEPENDENCY_NORMAL, false);
364 	}
365 
366 	/* Post creation hook for new constraint */
367 	InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
368 								  is_internal);
369 
370 	return conOid;
371 }
372 
373 /*
374  * Test whether given name is currently used as a constraint name
375  * for the given object (relation or domain).
376  *
377  * This is used to decide whether to accept a user-specified constraint name.
378  * It is deliberately not the same test as ChooseConstraintName uses to decide
379  * whether an auto-generated name is OK: here, we will allow it unless there
380  * is an identical constraint name in use *on the same object*.
381  *
382  * NB: Caller should hold exclusive lock on the given object, else
383  * this test can be fooled by concurrent additions.
384  */
385 bool
ConstraintNameIsUsed(ConstraintCategory conCat,Oid objId,const char * conname)386 ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
387 					 const char *conname)
388 {
389 	bool		found;
390 	Relation	conDesc;
391 	SysScanDesc conscan;
392 	ScanKeyData skey[3];
393 
394 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
395 
396 	ScanKeyInit(&skey[0],
397 				Anum_pg_constraint_conrelid,
398 				BTEqualStrategyNumber, F_OIDEQ,
399 				ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
400 								 ? objId : InvalidOid));
401 	ScanKeyInit(&skey[1],
402 				Anum_pg_constraint_contypid,
403 				BTEqualStrategyNumber, F_OIDEQ,
404 				ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
405 								 ? objId : InvalidOid));
406 	ScanKeyInit(&skey[2],
407 				Anum_pg_constraint_conname,
408 				BTEqualStrategyNumber, F_NAMEEQ,
409 				CStringGetDatum(conname));
410 
411 	conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
412 								 true, NULL, 3, skey);
413 
414 	/* There can be at most one matching row */
415 	found = (HeapTupleIsValid(systable_getnext(conscan)));
416 
417 	systable_endscan(conscan);
418 	table_close(conDesc, AccessShareLock);
419 
420 	return found;
421 }
422 
423 /*
424  * Does any constraint of the given name exist in the given namespace?
425  *
426  * This is used for code that wants to match ChooseConstraintName's rule
427  * that we should avoid autogenerating duplicate constraint names within a
428  * namespace.
429  */
430 bool
ConstraintNameExists(const char * conname,Oid namespaceid)431 ConstraintNameExists(const char *conname, Oid namespaceid)
432 {
433 	bool		found;
434 	Relation	conDesc;
435 	SysScanDesc conscan;
436 	ScanKeyData skey[2];
437 
438 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
439 
440 	ScanKeyInit(&skey[0],
441 				Anum_pg_constraint_conname,
442 				BTEqualStrategyNumber, F_NAMEEQ,
443 				CStringGetDatum(conname));
444 
445 	ScanKeyInit(&skey[1],
446 				Anum_pg_constraint_connamespace,
447 				BTEqualStrategyNumber, F_OIDEQ,
448 				ObjectIdGetDatum(namespaceid));
449 
450 	conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
451 								 NULL, 2, skey);
452 
453 	found = (HeapTupleIsValid(systable_getnext(conscan)));
454 
455 	systable_endscan(conscan);
456 	table_close(conDesc, AccessShareLock);
457 
458 	return found;
459 }
460 
461 /*
462  * Select a nonconflicting name for a new constraint.
463  *
464  * The objective here is to choose a name that is unique within the
465  * specified namespace.  Postgres does not require this, but the SQL
466  * spec does, and some apps depend on it.  Therefore we avoid choosing
467  * default names that so conflict.
468  *
469  * name1, name2, and label are used the same way as for makeObjectName(),
470  * except that the label can't be NULL; digits will be appended to the label
471  * if needed to create a name that is unique within the specified namespace.
472  *
473  * 'others' can be a list of string names already chosen within the current
474  * command (but not yet reflected into the catalogs); we will not choose
475  * a duplicate of one of these either.
476  *
477  * Note: it is theoretically possible to get a collision anyway, if someone
478  * else chooses the same name concurrently.  This is fairly unlikely to be
479  * a problem in practice, especially if one is holding an exclusive lock on
480  * the relation identified by name1.
481  *
482  * Returns a palloc'd string.
483  */
484 char *
ChooseConstraintName(const char * name1,const char * name2,const char * label,Oid namespaceid,List * others)485 ChooseConstraintName(const char *name1, const char *name2,
486 					 const char *label, Oid namespaceid,
487 					 List *others)
488 {
489 	int			pass = 0;
490 	char	   *conname = NULL;
491 	char		modlabel[NAMEDATALEN];
492 	Relation	conDesc;
493 	SysScanDesc conscan;
494 	ScanKeyData skey[2];
495 	bool		found;
496 	ListCell   *l;
497 
498 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
499 
500 	/* try the unmodified label first */
501 	StrNCpy(modlabel, label, sizeof(modlabel));
502 
503 	for (;;)
504 	{
505 		conname = makeObjectName(name1, name2, modlabel);
506 
507 		found = false;
508 
509 		foreach(l, others)
510 		{
511 			if (strcmp((char *) lfirst(l), conname) == 0)
512 			{
513 				found = true;
514 				break;
515 			}
516 		}
517 
518 		if (!found)
519 		{
520 			ScanKeyInit(&skey[0],
521 						Anum_pg_constraint_conname,
522 						BTEqualStrategyNumber, F_NAMEEQ,
523 						CStringGetDatum(conname));
524 
525 			ScanKeyInit(&skey[1],
526 						Anum_pg_constraint_connamespace,
527 						BTEqualStrategyNumber, F_OIDEQ,
528 						ObjectIdGetDatum(namespaceid));
529 
530 			conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
531 										 NULL, 2, skey);
532 
533 			found = (HeapTupleIsValid(systable_getnext(conscan)));
534 
535 			systable_endscan(conscan);
536 		}
537 
538 		if (!found)
539 			break;
540 
541 		/* found a conflict, so try a new name component */
542 		pfree(conname);
543 		snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
544 	}
545 
546 	table_close(conDesc, AccessShareLock);
547 
548 	return conname;
549 }
550 
551 /*
552  * Delete a single constraint record.
553  */
554 void
RemoveConstraintById(Oid conId)555 RemoveConstraintById(Oid conId)
556 {
557 	Relation	conDesc;
558 	HeapTuple	tup;
559 	Form_pg_constraint con;
560 
561 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
562 
563 	tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
564 	if (!HeapTupleIsValid(tup)) /* should not happen */
565 		elog(ERROR, "cache lookup failed for constraint %u", conId);
566 	con = (Form_pg_constraint) GETSTRUCT(tup);
567 
568 	/*
569 	 * Special processing depending on what the constraint is for.
570 	 */
571 	if (OidIsValid(con->conrelid))
572 	{
573 		Relation	rel;
574 
575 		/*
576 		 * If the constraint is for a relation, open and exclusive-lock the
577 		 * relation it's for.
578 		 */
579 		rel = table_open(con->conrelid, AccessExclusiveLock);
580 
581 		/*
582 		 * We need to update the relchecks count if it is a check constraint
583 		 * being dropped.  This update will force backends to rebuild relcache
584 		 * entries when we commit.
585 		 */
586 		if (con->contype == CONSTRAINT_CHECK)
587 		{
588 			Relation	pgrel;
589 			HeapTuple	relTup;
590 			Form_pg_class classForm;
591 
592 			pgrel = table_open(RelationRelationId, RowExclusiveLock);
593 			relTup = SearchSysCacheCopy1(RELOID,
594 										 ObjectIdGetDatum(con->conrelid));
595 			if (!HeapTupleIsValid(relTup))
596 				elog(ERROR, "cache lookup failed for relation %u",
597 					 con->conrelid);
598 			classForm = (Form_pg_class) GETSTRUCT(relTup);
599 
600 			if (classForm->relchecks == 0)	/* should not happen */
601 				elog(ERROR, "relation \"%s\" has relchecks = 0",
602 					 RelationGetRelationName(rel));
603 			classForm->relchecks--;
604 
605 			CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
606 
607 			heap_freetuple(relTup);
608 
609 			table_close(pgrel, RowExclusiveLock);
610 		}
611 
612 		/* Keep lock on constraint's rel until end of xact */
613 		table_close(rel, NoLock);
614 	}
615 	else if (OidIsValid(con->contypid))
616 	{
617 		/*
618 		 * XXX for now, do nothing special when dropping a domain constraint
619 		 *
620 		 * Probably there should be some form of locking on the domain type,
621 		 * but we have no such concept at the moment.
622 		 */
623 	}
624 	else
625 		elog(ERROR, "constraint %u is not of a known type", conId);
626 
627 	/* Fry the constraint itself */
628 	CatalogTupleDelete(conDesc, &tup->t_self);
629 
630 	/* Clean up */
631 	ReleaseSysCache(tup);
632 	table_close(conDesc, RowExclusiveLock);
633 }
634 
635 /*
636  * RenameConstraintById
637  *		Rename a constraint.
638  *
639  * Note: this isn't intended to be a user-exposed function; it doesn't check
640  * permissions etc.  Currently this is only invoked when renaming an index
641  * that is associated with a constraint, but it's made a little more general
642  * than that with the expectation of someday having ALTER TABLE RENAME
643  * CONSTRAINT.
644  */
645 void
RenameConstraintById(Oid conId,const char * newname)646 RenameConstraintById(Oid conId, const char *newname)
647 {
648 	Relation	conDesc;
649 	HeapTuple	tuple;
650 	Form_pg_constraint con;
651 
652 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
653 
654 	tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
655 	if (!HeapTupleIsValid(tuple))
656 		elog(ERROR, "cache lookup failed for constraint %u", conId);
657 	con = (Form_pg_constraint) GETSTRUCT(tuple);
658 
659 	/*
660 	 * For user-friendliness, check whether the name is already in use.
661 	 */
662 	if (OidIsValid(con->conrelid) &&
663 		ConstraintNameIsUsed(CONSTRAINT_RELATION,
664 							 con->conrelid,
665 							 newname))
666 		ereport(ERROR,
667 				(errcode(ERRCODE_DUPLICATE_OBJECT),
668 				 errmsg("constraint \"%s\" for relation \"%s\" already exists",
669 						newname, get_rel_name(con->conrelid))));
670 	if (OidIsValid(con->contypid) &&
671 		ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
672 							 con->contypid,
673 							 newname))
674 		ereport(ERROR,
675 				(errcode(ERRCODE_DUPLICATE_OBJECT),
676 				 errmsg("constraint \"%s\" for domain %s already exists",
677 						newname, format_type_be(con->contypid))));
678 
679 	/* OK, do the rename --- tuple is a copy, so OK to scribble on it */
680 	namestrcpy(&(con->conname), newname);
681 
682 	CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
683 
684 	InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
685 
686 	heap_freetuple(tuple);
687 	table_close(conDesc, RowExclusiveLock);
688 }
689 
690 /*
691  * AlterConstraintNamespaces
692  *		Find any constraints belonging to the specified object,
693  *		and move them to the specified new namespace.
694  *
695  * isType indicates whether the owning object is a type or a relation.
696  */
697 void
AlterConstraintNamespaces(Oid ownerId,Oid oldNspId,Oid newNspId,bool isType,ObjectAddresses * objsMoved)698 AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
699 						  Oid newNspId, bool isType, ObjectAddresses *objsMoved)
700 {
701 	Relation	conRel;
702 	ScanKeyData key[2];
703 	SysScanDesc scan;
704 	HeapTuple	tup;
705 
706 	conRel = table_open(ConstraintRelationId, RowExclusiveLock);
707 
708 	ScanKeyInit(&key[0],
709 				Anum_pg_constraint_conrelid,
710 				BTEqualStrategyNumber, F_OIDEQ,
711 				ObjectIdGetDatum(isType ? InvalidOid : ownerId));
712 	ScanKeyInit(&key[1],
713 				Anum_pg_constraint_contypid,
714 				BTEqualStrategyNumber, F_OIDEQ,
715 				ObjectIdGetDatum(isType ? ownerId : InvalidOid));
716 
717 	scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
718 							  NULL, 2, key);
719 
720 	while (HeapTupleIsValid((tup = systable_getnext(scan))))
721 	{
722 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
723 		ObjectAddress thisobj;
724 
725 		thisobj.classId = ConstraintRelationId;
726 		thisobj.objectId = conform->oid;
727 		thisobj.objectSubId = 0;
728 
729 		if (object_address_present(&thisobj, objsMoved))
730 			continue;
731 
732 		/* Don't update if the object is already part of the namespace */
733 		if (conform->connamespace == oldNspId && oldNspId != newNspId)
734 		{
735 			tup = heap_copytuple(tup);
736 			conform = (Form_pg_constraint) GETSTRUCT(tup);
737 
738 			conform->connamespace = newNspId;
739 
740 			CatalogTupleUpdate(conRel, &tup->t_self, tup);
741 
742 			/*
743 			 * Note: currently, the constraint will not have its own
744 			 * dependency on the namespace, so we don't need to do
745 			 * changeDependencyFor().
746 			 */
747 		}
748 
749 		InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
750 
751 		add_exact_object_address(&thisobj, objsMoved);
752 	}
753 
754 	systable_endscan(scan);
755 
756 	table_close(conRel, RowExclusiveLock);
757 }
758 
759 /*
760  * ConstraintSetParentConstraint
761  *		Set a partition's constraint as child of its parent constraint,
762  *		or remove the linkage if parentConstrId is InvalidOid.
763  *
764  * This updates the constraint's pg_constraint row to show it as inherited, and
765  * adds PARTITION dependencies to prevent the constraint from being deleted
766  * on its own.  Alternatively, reverse that.
767  */
768 void
ConstraintSetParentConstraint(Oid childConstrId,Oid parentConstrId,Oid childTableId)769 ConstraintSetParentConstraint(Oid childConstrId,
770 							  Oid parentConstrId,
771 							  Oid childTableId)
772 {
773 	Relation	constrRel;
774 	Form_pg_constraint constrForm;
775 	HeapTuple	tuple,
776 				newtup;
777 	ObjectAddress depender;
778 	ObjectAddress referenced;
779 
780 	constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
781 	tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
782 	if (!HeapTupleIsValid(tuple))
783 		elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
784 	newtup = heap_copytuple(tuple);
785 	constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
786 	if (OidIsValid(parentConstrId))
787 	{
788 		/* don't allow setting parent for a constraint that already has one */
789 		Assert(constrForm->coninhcount == 0);
790 		if (constrForm->conparentid != InvalidOid)
791 			elog(ERROR, "constraint %u already has a parent constraint",
792 				 childConstrId);
793 
794 		constrForm->conislocal = false;
795 		constrForm->coninhcount++;
796 		constrForm->conparentid = parentConstrId;
797 
798 		CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
799 
800 		ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
801 
802 		ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
803 		recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
804 
805 		ObjectAddressSet(referenced, RelationRelationId, childTableId);
806 		recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
807 	}
808 	else
809 	{
810 		constrForm->coninhcount--;
811 		constrForm->conislocal = true;
812 		constrForm->conparentid = InvalidOid;
813 
814 		/* Make sure there's no further inheritance. */
815 		Assert(constrForm->coninhcount == 0);
816 
817 		CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
818 
819 		deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
820 										ConstraintRelationId,
821 										DEPENDENCY_PARTITION_PRI);
822 		deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
823 										RelationRelationId,
824 										DEPENDENCY_PARTITION_SEC);
825 	}
826 
827 	ReleaseSysCache(tuple);
828 	table_close(constrRel, RowExclusiveLock);
829 }
830 
831 
832 /*
833  * get_relation_constraint_oid
834  *		Find a constraint on the specified relation with the specified name.
835  *		Returns constraint's OID.
836  */
837 Oid
get_relation_constraint_oid(Oid relid,const char * conname,bool missing_ok)838 get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
839 {
840 	Relation	pg_constraint;
841 	HeapTuple	tuple;
842 	SysScanDesc scan;
843 	ScanKeyData skey[3];
844 	Oid			conOid = InvalidOid;
845 
846 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
847 
848 	ScanKeyInit(&skey[0],
849 				Anum_pg_constraint_conrelid,
850 				BTEqualStrategyNumber, F_OIDEQ,
851 				ObjectIdGetDatum(relid));
852 	ScanKeyInit(&skey[1],
853 				Anum_pg_constraint_contypid,
854 				BTEqualStrategyNumber, F_OIDEQ,
855 				ObjectIdGetDatum(InvalidOid));
856 	ScanKeyInit(&skey[2],
857 				Anum_pg_constraint_conname,
858 				BTEqualStrategyNumber, F_NAMEEQ,
859 				CStringGetDatum(conname));
860 
861 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
862 							  NULL, 3, skey);
863 
864 	/* There can be at most one matching row */
865 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
866 		conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
867 
868 	systable_endscan(scan);
869 
870 	/* If no such constraint exists, complain */
871 	if (!OidIsValid(conOid) && !missing_ok)
872 		ereport(ERROR,
873 				(errcode(ERRCODE_UNDEFINED_OBJECT),
874 				 errmsg("constraint \"%s\" for table \"%s\" does not exist",
875 						conname, get_rel_name(relid))));
876 
877 	table_close(pg_constraint, AccessShareLock);
878 
879 	return conOid;
880 }
881 
882 /*
883  * get_relation_constraint_attnos
884  *		Find a constraint on the specified relation with the specified name
885  *		and return the constrained columns.
886  *
887  * Returns a Bitmapset of the column attnos of the constrained columns, with
888  * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
889  * columns can be represented.
890  *
891  * *constraintOid is set to the OID of the constraint, or InvalidOid on
892  * failure.
893  */
894 Bitmapset *
get_relation_constraint_attnos(Oid relid,const char * conname,bool missing_ok,Oid * constraintOid)895 get_relation_constraint_attnos(Oid relid, const char *conname,
896 							   bool missing_ok, Oid *constraintOid)
897 {
898 	Bitmapset  *conattnos = NULL;
899 	Relation	pg_constraint;
900 	HeapTuple	tuple;
901 	SysScanDesc scan;
902 	ScanKeyData skey[3];
903 
904 	/* Set *constraintOid, to avoid complaints about uninitialized vars */
905 	*constraintOid = InvalidOid;
906 
907 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
908 
909 	ScanKeyInit(&skey[0],
910 				Anum_pg_constraint_conrelid,
911 				BTEqualStrategyNumber, F_OIDEQ,
912 				ObjectIdGetDatum(relid));
913 	ScanKeyInit(&skey[1],
914 				Anum_pg_constraint_contypid,
915 				BTEqualStrategyNumber, F_OIDEQ,
916 				ObjectIdGetDatum(InvalidOid));
917 	ScanKeyInit(&skey[2],
918 				Anum_pg_constraint_conname,
919 				BTEqualStrategyNumber, F_NAMEEQ,
920 				CStringGetDatum(conname));
921 
922 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
923 							  NULL, 3, skey);
924 
925 	/* There can be at most one matching row */
926 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
927 	{
928 		Datum		adatum;
929 		bool		isNull;
930 
931 		*constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
932 
933 		/* Extract the conkey array, ie, attnums of constrained columns */
934 		adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
935 							  RelationGetDescr(pg_constraint), &isNull);
936 		if (!isNull)
937 		{
938 			ArrayType  *arr;
939 			int			numcols;
940 			int16	   *attnums;
941 			int			i;
942 
943 			arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
944 			numcols = ARR_DIMS(arr)[0];
945 			if (ARR_NDIM(arr) != 1 ||
946 				numcols < 0 ||
947 				ARR_HASNULL(arr) ||
948 				ARR_ELEMTYPE(arr) != INT2OID)
949 				elog(ERROR, "conkey is not a 1-D smallint array");
950 			attnums = (int16 *) ARR_DATA_PTR(arr);
951 
952 			/* Construct the result value */
953 			for (i = 0; i < numcols; i++)
954 			{
955 				conattnos = bms_add_member(conattnos,
956 										   attnums[i] - FirstLowInvalidHeapAttributeNumber);
957 			}
958 		}
959 	}
960 
961 	systable_endscan(scan);
962 
963 	/* If no such constraint exists, complain */
964 	if (!OidIsValid(*constraintOid) && !missing_ok)
965 		ereport(ERROR,
966 				(errcode(ERRCODE_UNDEFINED_OBJECT),
967 				 errmsg("constraint \"%s\" for table \"%s\" does not exist",
968 						conname, get_rel_name(relid))));
969 
970 	table_close(pg_constraint, AccessShareLock);
971 
972 	return conattnos;
973 }
974 
975 /*
976  * Return the OID of the constraint associated with the given index in the
977  * given relation; or InvalidOid if no such index is catalogued.
978  */
979 Oid
get_relation_idx_constraint_oid(Oid relationId,Oid indexId)980 get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
981 {
982 	Relation	pg_constraint;
983 	SysScanDesc scan;
984 	ScanKeyData key;
985 	HeapTuple	tuple;
986 	Oid			constraintId = InvalidOid;
987 
988 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
989 
990 	ScanKeyInit(&key,
991 				Anum_pg_constraint_conrelid,
992 				BTEqualStrategyNumber,
993 				F_OIDEQ,
994 				ObjectIdGetDatum(relationId));
995 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
996 							  true, NULL, 1, &key);
997 	while ((tuple = systable_getnext(scan)) != NULL)
998 	{
999 		Form_pg_constraint constrForm;
1000 
1001 		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
1002 		if (constrForm->conindid == indexId)
1003 		{
1004 			constraintId = constrForm->oid;
1005 			break;
1006 		}
1007 	}
1008 	systable_endscan(scan);
1009 
1010 	table_close(pg_constraint, AccessShareLock);
1011 	return constraintId;
1012 }
1013 
1014 /*
1015  * get_domain_constraint_oid
1016  *		Find a constraint on the specified domain with the specified name.
1017  *		Returns constraint's OID.
1018  */
1019 Oid
get_domain_constraint_oid(Oid typid,const char * conname,bool missing_ok)1020 get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1021 {
1022 	Relation	pg_constraint;
1023 	HeapTuple	tuple;
1024 	SysScanDesc scan;
1025 	ScanKeyData skey[3];
1026 	Oid			conOid = InvalidOid;
1027 
1028 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1029 
1030 	ScanKeyInit(&skey[0],
1031 				Anum_pg_constraint_conrelid,
1032 				BTEqualStrategyNumber, F_OIDEQ,
1033 				ObjectIdGetDatum(InvalidOid));
1034 	ScanKeyInit(&skey[1],
1035 				Anum_pg_constraint_contypid,
1036 				BTEqualStrategyNumber, F_OIDEQ,
1037 				ObjectIdGetDatum(typid));
1038 	ScanKeyInit(&skey[2],
1039 				Anum_pg_constraint_conname,
1040 				BTEqualStrategyNumber, F_NAMEEQ,
1041 				CStringGetDatum(conname));
1042 
1043 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1044 							  NULL, 3, skey);
1045 
1046 	/* There can be at most one matching row */
1047 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1048 		conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1049 
1050 	systable_endscan(scan);
1051 
1052 	/* If no such constraint exists, complain */
1053 	if (!OidIsValid(conOid) && !missing_ok)
1054 		ereport(ERROR,
1055 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1056 				 errmsg("constraint \"%s\" for domain %s does not exist",
1057 						conname, format_type_be(typid))));
1058 
1059 	table_close(pg_constraint, AccessShareLock);
1060 
1061 	return conOid;
1062 }
1063 
1064 /*
1065  * get_primary_key_attnos
1066  *		Identify the columns in a relation's primary key, if any.
1067  *
1068  * Returns a Bitmapset of the column attnos of the primary key's columns,
1069  * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1070  * system columns can be represented.
1071  *
1072  * If there is no primary key, return NULL.  We also return NULL if the pkey
1073  * constraint is deferrable and deferrableOk is false.
1074  *
1075  * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1076  * on failure.
1077  */
1078 Bitmapset *
get_primary_key_attnos(Oid relid,bool deferrableOk,Oid * constraintOid)1079 get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1080 {
1081 	Bitmapset  *pkattnos = NULL;
1082 	Relation	pg_constraint;
1083 	HeapTuple	tuple;
1084 	SysScanDesc scan;
1085 	ScanKeyData skey[1];
1086 
1087 	/* Set *constraintOid, to avoid complaints about uninitialized vars */
1088 	*constraintOid = InvalidOid;
1089 
1090 	/* Scan pg_constraint for constraints of the target rel */
1091 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1092 
1093 	ScanKeyInit(&skey[0],
1094 				Anum_pg_constraint_conrelid,
1095 				BTEqualStrategyNumber, F_OIDEQ,
1096 				ObjectIdGetDatum(relid));
1097 
1098 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1099 							  NULL, 1, skey);
1100 
1101 	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1102 	{
1103 		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1104 		Datum		adatum;
1105 		bool		isNull;
1106 		ArrayType  *arr;
1107 		int16	   *attnums;
1108 		int			numkeys;
1109 		int			i;
1110 
1111 		/* Skip constraints that are not PRIMARY KEYs */
1112 		if (con->contype != CONSTRAINT_PRIMARY)
1113 			continue;
1114 
1115 		/*
1116 		 * If the primary key is deferrable, but we've been instructed to
1117 		 * ignore deferrable constraints, then we might as well give up
1118 		 * searching, since there can only be a single primary key on a table.
1119 		 */
1120 		if (con->condeferrable && !deferrableOk)
1121 			break;
1122 
1123 		/* Extract the conkey array, ie, attnums of PK's columns */
1124 		adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1125 							  RelationGetDescr(pg_constraint), &isNull);
1126 		if (isNull)
1127 			elog(ERROR, "null conkey for constraint %u",
1128 				 ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1129 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1130 		numkeys = ARR_DIMS(arr)[0];
1131 		if (ARR_NDIM(arr) != 1 ||
1132 			numkeys < 0 ||
1133 			ARR_HASNULL(arr) ||
1134 			ARR_ELEMTYPE(arr) != INT2OID)
1135 			elog(ERROR, "conkey is not a 1-D smallint array");
1136 		attnums = (int16 *) ARR_DATA_PTR(arr);
1137 
1138 		/* Construct the result value */
1139 		for (i = 0; i < numkeys; i++)
1140 		{
1141 			pkattnos = bms_add_member(pkattnos,
1142 									  attnums[i] - FirstLowInvalidHeapAttributeNumber);
1143 		}
1144 		*constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1145 
1146 		/* No need to search further */
1147 		break;
1148 	}
1149 
1150 	systable_endscan(scan);
1151 
1152 	table_close(pg_constraint, AccessShareLock);
1153 
1154 	return pkattnos;
1155 }
1156 
1157 /*
1158  * Extract data from the pg_constraint tuple of a foreign-key constraint.
1159  *
1160  * All arguments save the first are output arguments; the last three of them
1161  * can be passed as NULL if caller doesn't need them.
1162  */
1163 void
DeconstructFkConstraintRow(HeapTuple tuple,int * numfks,AttrNumber * conkey,AttrNumber * confkey,Oid * pf_eq_oprs,Oid * pp_eq_oprs,Oid * ff_eq_oprs)1164 DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1165 						   AttrNumber *conkey, AttrNumber *confkey,
1166 						   Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
1167 {
1168 	Oid			constrId;
1169 	Datum		adatum;
1170 	bool		isNull;
1171 	ArrayType  *arr;
1172 	int			numkeys;
1173 
1174 	constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1175 
1176 	/*
1177 	 * We expect the arrays to be 1-D arrays of the right types; verify that.
1178 	 * We don't need to use deconstruct_array() since the array data is just
1179 	 * going to look like a C array of values.
1180 	 */
1181 	adatum = SysCacheGetAttr(CONSTROID, tuple,
1182 							 Anum_pg_constraint_conkey, &isNull);
1183 	if (isNull)
1184 		elog(ERROR, "null conkey for constraint %u", constrId);
1185 	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1186 	if (ARR_NDIM(arr) != 1 ||
1187 		ARR_HASNULL(arr) ||
1188 		ARR_ELEMTYPE(arr) != INT2OID)
1189 		elog(ERROR, "conkey is not a 1-D smallint array");
1190 	numkeys = ARR_DIMS(arr)[0];
1191 	if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1192 		elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1193 	memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1194 	if ((Pointer) arr != DatumGetPointer(adatum))
1195 		pfree(arr);				/* free de-toasted copy, if any */
1196 
1197 	adatum = SysCacheGetAttr(CONSTROID, tuple,
1198 							 Anum_pg_constraint_confkey, &isNull);
1199 	if (isNull)
1200 		elog(ERROR, "null confkey for constraint %u", constrId);
1201 	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1202 	if (ARR_NDIM(arr) != 1 ||
1203 		ARR_DIMS(arr)[0] != numkeys ||
1204 		ARR_HASNULL(arr) ||
1205 		ARR_ELEMTYPE(arr) != INT2OID)
1206 		elog(ERROR, "confkey is not a 1-D smallint array");
1207 	memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1208 	if ((Pointer) arr != DatumGetPointer(adatum))
1209 		pfree(arr);				/* free de-toasted copy, if any */
1210 
1211 	if (pf_eq_oprs)
1212 	{
1213 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1214 								 Anum_pg_constraint_conpfeqop, &isNull);
1215 		if (isNull)
1216 			elog(ERROR, "null conpfeqop for constraint %u", constrId);
1217 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1218 		/* see TryReuseForeignKey if you change the test below */
1219 		if (ARR_NDIM(arr) != 1 ||
1220 			ARR_DIMS(arr)[0] != numkeys ||
1221 			ARR_HASNULL(arr) ||
1222 			ARR_ELEMTYPE(arr) != OIDOID)
1223 			elog(ERROR, "conpfeqop is not a 1-D Oid array");
1224 		memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1225 		if ((Pointer) arr != DatumGetPointer(adatum))
1226 			pfree(arr);			/* free de-toasted copy, if any */
1227 	}
1228 
1229 	if (pp_eq_oprs)
1230 	{
1231 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1232 								 Anum_pg_constraint_conppeqop, &isNull);
1233 		if (isNull)
1234 			elog(ERROR, "null conppeqop for constraint %u", constrId);
1235 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1236 		if (ARR_NDIM(arr) != 1 ||
1237 			ARR_DIMS(arr)[0] != numkeys ||
1238 			ARR_HASNULL(arr) ||
1239 			ARR_ELEMTYPE(arr) != OIDOID)
1240 			elog(ERROR, "conppeqop is not a 1-D Oid array");
1241 		memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1242 		if ((Pointer) arr != DatumGetPointer(adatum))
1243 			pfree(arr);			/* free de-toasted copy, if any */
1244 	}
1245 
1246 	if (ff_eq_oprs)
1247 	{
1248 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1249 								 Anum_pg_constraint_conffeqop, &isNull);
1250 		if (isNull)
1251 			elog(ERROR, "null conffeqop for constraint %u", constrId);
1252 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1253 		if (ARR_NDIM(arr) != 1 ||
1254 			ARR_DIMS(arr)[0] != numkeys ||
1255 			ARR_HASNULL(arr) ||
1256 			ARR_ELEMTYPE(arr) != OIDOID)
1257 			elog(ERROR, "conffeqop is not a 1-D Oid array");
1258 		memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1259 		if ((Pointer) arr != DatumGetPointer(adatum))
1260 			pfree(arr);			/* free de-toasted copy, if any */
1261 	}
1262 
1263 	*numfks = numkeys;
1264 }
1265 
1266 /*
1267  * Determine whether a relation can be proven functionally dependent on
1268  * a set of grouping columns.  If so, return true and add the pg_constraint
1269  * OIDs of the constraints needed for the proof to the *constraintDeps list.
1270  *
1271  * grouping_columns is a list of grouping expressions, in which columns of
1272  * the rel of interest are Vars with the indicated varno/varlevelsup.
1273  *
1274  * Currently we only check to see if the rel has a primary key that is a
1275  * subset of the grouping_columns.  We could also use plain unique constraints
1276  * if all their columns are known not null, but there's a problem: we need
1277  * to be able to represent the not-null-ness as part of the constraints added
1278  * to *constraintDeps.  FIXME whenever not-null constraints get represented
1279  * in pg_constraint.
1280  */
1281 bool
check_functional_grouping(Oid relid,Index varno,Index varlevelsup,List * grouping_columns,List ** constraintDeps)1282 check_functional_grouping(Oid relid,
1283 						  Index varno, Index varlevelsup,
1284 						  List *grouping_columns,
1285 						  List **constraintDeps)
1286 {
1287 	Bitmapset  *pkattnos;
1288 	Bitmapset  *groupbyattnos;
1289 	Oid			constraintOid;
1290 	ListCell   *gl;
1291 
1292 	/* If the rel has no PK, then we can't prove functional dependency */
1293 	pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1294 	if (pkattnos == NULL)
1295 		return false;
1296 
1297 	/* Identify all the rel's columns that appear in grouping_columns */
1298 	groupbyattnos = NULL;
1299 	foreach(gl, grouping_columns)
1300 	{
1301 		Var		   *gvar = (Var *) lfirst(gl);
1302 
1303 		if (IsA(gvar, Var) &&
1304 			gvar->varno == varno &&
1305 			gvar->varlevelsup == varlevelsup)
1306 			groupbyattnos = bms_add_member(groupbyattnos,
1307 										   gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1308 	}
1309 
1310 	if (bms_is_subset(pkattnos, groupbyattnos))
1311 	{
1312 		/* The PK is a subset of grouping_columns, so we win */
1313 		*constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1314 		return true;
1315 	}
1316 
1317 	return false;
1318 }
1319