1 /*-------------------------------------------------------------------------
2  *
3  * pg_constraint.c
4  *	  routines to support manipulation of the pg_constraint relation
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/catalog/pg_constraint.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/sysattr.h"
20 #include "access/table.h"
21 #include "access/tupconvert.h"
22 #include "access/xact.h"
23 #include "catalog/catalog.h"
24 #include "catalog/dependency.h"
25 #include "catalog/indexing.h"
26 #include "catalog/objectaccess.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_operator.h"
29 #include "catalog/pg_type.h"
30 #include "commands/defrem.h"
31 #include "commands/tablecmds.h"
32 #include "utils/array.h"
33 #include "utils/builtins.h"
34 #include "utils/fmgroids.h"
35 #include "utils/lsyscache.h"
36 #include "utils/rel.h"
37 #include "utils/syscache.h"
38 
39 
40 /*
41  * CreateConstraintEntry
42  *	Create a constraint table entry.
43  *
44  * Subsidiary records (such as triggers or indexes to implement the
45  * constraint) are *not* created here.  But we do make dependency links
46  * from the constraint to the things it depends on.
47  *
48  * The new constraint's OID is returned.
49  */
50 Oid
CreateConstraintEntry(const char * constraintName,Oid constraintNamespace,char constraintType,bool isDeferrable,bool isDeferred,bool isValidated,Oid parentConstrId,Oid relId,const int16 * constraintKey,int constraintNKeys,int constraintNTotalKeys,Oid domainId,Oid indexRelId,Oid foreignRelId,const int16 * foreignKey,const Oid * pfEqOp,const Oid * ppEqOp,const Oid * ffEqOp,int foreignNKeys,char foreignUpdateType,char foreignDeleteType,char foreignMatchType,const Oid * exclOp,Node * conExpr,const char * conBin,bool conIsLocal,int conInhCount,bool conNoInherit,bool is_internal)51 CreateConstraintEntry(const char *constraintName,
52 					  Oid constraintNamespace,
53 					  char constraintType,
54 					  bool isDeferrable,
55 					  bool isDeferred,
56 					  bool isValidated,
57 					  Oid parentConstrId,
58 					  Oid relId,
59 					  const int16 *constraintKey,
60 					  int constraintNKeys,
61 					  int constraintNTotalKeys,
62 					  Oid domainId,
63 					  Oid indexRelId,
64 					  Oid foreignRelId,
65 					  const int16 *foreignKey,
66 					  const Oid *pfEqOp,
67 					  const Oid *ppEqOp,
68 					  const Oid *ffEqOp,
69 					  int foreignNKeys,
70 					  char foreignUpdateType,
71 					  char foreignDeleteType,
72 					  char foreignMatchType,
73 					  const Oid *exclOp,
74 					  Node *conExpr,
75 					  const char *conBin,
76 					  bool conIsLocal,
77 					  int conInhCount,
78 					  bool conNoInherit,
79 					  bool is_internal)
80 {
81 	Relation	conDesc;
82 	Oid			conOid;
83 	HeapTuple	tup;
84 	bool		nulls[Natts_pg_constraint];
85 	Datum		values[Natts_pg_constraint];
86 	ArrayType  *conkeyArray;
87 	ArrayType  *confkeyArray;
88 	ArrayType  *conpfeqopArray;
89 	ArrayType  *conppeqopArray;
90 	ArrayType  *conffeqopArray;
91 	ArrayType  *conexclopArray;
92 	NameData	cname;
93 	int			i;
94 	ObjectAddress conobject;
95 
96 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
97 
98 	Assert(constraintName);
99 	namestrcpy(&cname, constraintName);
100 
101 	/*
102 	 * Convert C arrays into Postgres arrays.
103 	 */
104 	if (constraintNKeys > 0)
105 	{
106 		Datum	   *conkey;
107 
108 		conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
109 		for (i = 0; i < constraintNKeys; i++)
110 			conkey[i] = Int16GetDatum(constraintKey[i]);
111 		conkeyArray = construct_array(conkey, constraintNKeys,
112 									  INT2OID, 2, true, 's');
113 	}
114 	else
115 		conkeyArray = NULL;
116 
117 	if (foreignNKeys > 0)
118 	{
119 		Datum	   *fkdatums;
120 
121 		fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
122 		for (i = 0; i < foreignNKeys; i++)
123 			fkdatums[i] = Int16GetDatum(foreignKey[i]);
124 		confkeyArray = construct_array(fkdatums, foreignNKeys,
125 									   INT2OID, 2, true, 's');
126 		for (i = 0; i < foreignNKeys; i++)
127 			fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
128 		conpfeqopArray = construct_array(fkdatums, foreignNKeys,
129 										 OIDOID, sizeof(Oid), true, 'i');
130 		for (i = 0; i < foreignNKeys; i++)
131 			fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
132 		conppeqopArray = construct_array(fkdatums, foreignNKeys,
133 										 OIDOID, sizeof(Oid), true, 'i');
134 		for (i = 0; i < foreignNKeys; i++)
135 			fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
136 		conffeqopArray = construct_array(fkdatums, foreignNKeys,
137 										 OIDOID, sizeof(Oid), true, 'i');
138 	}
139 	else
140 	{
141 		confkeyArray = NULL;
142 		conpfeqopArray = NULL;
143 		conppeqopArray = NULL;
144 		conffeqopArray = NULL;
145 	}
146 
147 	if (exclOp != NULL)
148 	{
149 		Datum	   *opdatums;
150 
151 		opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
152 		for (i = 0; i < constraintNKeys; i++)
153 			opdatums[i] = ObjectIdGetDatum(exclOp[i]);
154 		conexclopArray = construct_array(opdatums, constraintNKeys,
155 										 OIDOID, sizeof(Oid), true, 'i');
156 	}
157 	else
158 		conexclopArray = NULL;
159 
160 	/* initialize nulls and values */
161 	for (i = 0; i < Natts_pg_constraint; i++)
162 	{
163 		nulls[i] = false;
164 		values[i] = (Datum) NULL;
165 	}
166 
167 	conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
168 								Anum_pg_constraint_oid);
169 	values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
170 	values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
171 	values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
172 	values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
173 	values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
174 	values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
175 	values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
176 	values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
177 	values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
178 	values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
179 	values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
180 	values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
181 	values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
182 	values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
183 	values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
184 	values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
185 	values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
186 	values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
187 
188 	if (conkeyArray)
189 		values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
190 	else
191 		nulls[Anum_pg_constraint_conkey - 1] = true;
192 
193 	if (confkeyArray)
194 		values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
195 	else
196 		nulls[Anum_pg_constraint_confkey - 1] = true;
197 
198 	if (conpfeqopArray)
199 		values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
200 	else
201 		nulls[Anum_pg_constraint_conpfeqop - 1] = true;
202 
203 	if (conppeqopArray)
204 		values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
205 	else
206 		nulls[Anum_pg_constraint_conppeqop - 1] = true;
207 
208 	if (conffeqopArray)
209 		values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
210 	else
211 		nulls[Anum_pg_constraint_conffeqop - 1] = true;
212 
213 	if (conexclopArray)
214 		values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
215 	else
216 		nulls[Anum_pg_constraint_conexclop - 1] = true;
217 
218 	if (conBin)
219 		values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
220 	else
221 		nulls[Anum_pg_constraint_conbin - 1] = true;
222 
223 	tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
224 
225 	CatalogTupleInsert(conDesc, tup);
226 
227 	conobject.classId = ConstraintRelationId;
228 	conobject.objectId = conOid;
229 	conobject.objectSubId = 0;
230 
231 	table_close(conDesc, RowExclusiveLock);
232 
233 	if (OidIsValid(relId))
234 	{
235 		/*
236 		 * Register auto dependency from constraint to owning relation, or to
237 		 * specific column(s) if any are mentioned.
238 		 */
239 		ObjectAddress relobject;
240 
241 		relobject.classId = RelationRelationId;
242 		relobject.objectId = relId;
243 		if (constraintNTotalKeys > 0)
244 		{
245 			for (i = 0; i < constraintNTotalKeys; i++)
246 			{
247 				relobject.objectSubId = constraintKey[i];
248 
249 				recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
250 			}
251 		}
252 		else
253 		{
254 			relobject.objectSubId = 0;
255 
256 			recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
257 		}
258 	}
259 
260 	if (OidIsValid(domainId))
261 	{
262 		/*
263 		 * Register auto dependency from constraint to owning domain
264 		 */
265 		ObjectAddress domobject;
266 
267 		domobject.classId = TypeRelationId;
268 		domobject.objectId = domainId;
269 		domobject.objectSubId = 0;
270 
271 		recordDependencyOn(&conobject, &domobject, DEPENDENCY_AUTO);
272 	}
273 
274 	if (OidIsValid(foreignRelId))
275 	{
276 		/*
277 		 * Register normal dependency from constraint to foreign relation, or
278 		 * to specific column(s) if any are mentioned.
279 		 */
280 		ObjectAddress relobject;
281 
282 		relobject.classId = RelationRelationId;
283 		relobject.objectId = foreignRelId;
284 		if (foreignNKeys > 0)
285 		{
286 			for (i = 0; i < foreignNKeys; i++)
287 			{
288 				relobject.objectSubId = foreignKey[i];
289 
290 				recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
291 			}
292 		}
293 		else
294 		{
295 			relobject.objectSubId = 0;
296 
297 			recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
298 		}
299 	}
300 
301 	if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
302 	{
303 		/*
304 		 * Register normal dependency on the unique index that supports a
305 		 * foreign-key constraint.  (Note: for indexes associated with unique
306 		 * or primary-key constraints, the dependency runs the other way, and
307 		 * is not made here.)
308 		 */
309 		ObjectAddress relobject;
310 
311 		relobject.classId = RelationRelationId;
312 		relobject.objectId = indexRelId;
313 		relobject.objectSubId = 0;
314 
315 		recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
316 	}
317 
318 	if (foreignNKeys > 0)
319 	{
320 		/*
321 		 * Register normal dependencies on the equality operators that support
322 		 * a foreign-key constraint.  If the PK and FK types are the same then
323 		 * all three operators for a column are the same; otherwise they are
324 		 * different.
325 		 */
326 		ObjectAddress oprobject;
327 
328 		oprobject.classId = OperatorRelationId;
329 		oprobject.objectSubId = 0;
330 
331 		for (i = 0; i < foreignNKeys; i++)
332 		{
333 			oprobject.objectId = pfEqOp[i];
334 			recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
335 			if (ppEqOp[i] != pfEqOp[i])
336 			{
337 				oprobject.objectId = ppEqOp[i];
338 				recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
339 			}
340 			if (ffEqOp[i] != pfEqOp[i])
341 			{
342 				oprobject.objectId = ffEqOp[i];
343 				recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
344 			}
345 		}
346 	}
347 
348 	/*
349 	 * We don't bother to register dependencies on the exclusion operators of
350 	 * an exclusion constraint.  We assume they are members of the opclass
351 	 * supporting the index, so there's an indirect dependency via that. (This
352 	 * would be pretty dicey for cross-type operators, but exclusion operators
353 	 * can never be cross-type.)
354 	 */
355 
356 	if (conExpr != NULL)
357 	{
358 		/*
359 		 * Register dependencies from constraint to objects mentioned in CHECK
360 		 * expression.
361 		 */
362 		recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
363 										DEPENDENCY_NORMAL,
364 										DEPENDENCY_NORMAL, false);
365 	}
366 
367 	/* Post creation hook for new constraint */
368 	InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
369 								  is_internal);
370 
371 	return conOid;
372 }
373 
374 /*
375  * Test whether given name is currently used as a constraint name
376  * for the given object (relation or domain).
377  *
378  * This is used to decide whether to accept a user-specified constraint name.
379  * It is deliberately not the same test as ChooseConstraintName uses to decide
380  * whether an auto-generated name is OK: here, we will allow it unless there
381  * is an identical constraint name in use *on the same object*.
382  *
383  * NB: Caller should hold exclusive lock on the given object, else
384  * this test can be fooled by concurrent additions.
385  */
386 bool
ConstraintNameIsUsed(ConstraintCategory conCat,Oid objId,const char * conname)387 ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
388 					 const char *conname)
389 {
390 	bool		found;
391 	Relation	conDesc;
392 	SysScanDesc conscan;
393 	ScanKeyData skey[3];
394 
395 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
396 
397 	ScanKeyInit(&skey[0],
398 				Anum_pg_constraint_conrelid,
399 				BTEqualStrategyNumber, F_OIDEQ,
400 				ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
401 								 ? objId : InvalidOid));
402 	ScanKeyInit(&skey[1],
403 				Anum_pg_constraint_contypid,
404 				BTEqualStrategyNumber, F_OIDEQ,
405 				ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
406 								 ? objId : InvalidOid));
407 	ScanKeyInit(&skey[2],
408 				Anum_pg_constraint_conname,
409 				BTEqualStrategyNumber, F_NAMEEQ,
410 				CStringGetDatum(conname));
411 
412 	conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
413 								 true, NULL, 3, skey);
414 
415 	/* There can be at most one matching row */
416 	found = (HeapTupleIsValid(systable_getnext(conscan)));
417 
418 	systable_endscan(conscan);
419 	table_close(conDesc, AccessShareLock);
420 
421 	return found;
422 }
423 
424 /*
425  * Does any constraint of the given name exist in the given namespace?
426  *
427  * This is used for code that wants to match ChooseConstraintName's rule
428  * that we should avoid autogenerating duplicate constraint names within a
429  * namespace.
430  */
431 bool
ConstraintNameExists(const char * conname,Oid namespaceid)432 ConstraintNameExists(const char *conname, Oid namespaceid)
433 {
434 	bool		found;
435 	Relation	conDesc;
436 	SysScanDesc conscan;
437 	ScanKeyData skey[2];
438 
439 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
440 
441 	ScanKeyInit(&skey[0],
442 				Anum_pg_constraint_conname,
443 				BTEqualStrategyNumber, F_NAMEEQ,
444 				CStringGetDatum(conname));
445 
446 	ScanKeyInit(&skey[1],
447 				Anum_pg_constraint_connamespace,
448 				BTEqualStrategyNumber, F_OIDEQ,
449 				ObjectIdGetDatum(namespaceid));
450 
451 	conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
452 								 NULL, 2, skey);
453 
454 	found = (HeapTupleIsValid(systable_getnext(conscan)));
455 
456 	systable_endscan(conscan);
457 	table_close(conDesc, AccessShareLock);
458 
459 	return found;
460 }
461 
462 /*
463  * Select a nonconflicting name for a new constraint.
464  *
465  * The objective here is to choose a name that is unique within the
466  * specified namespace.  Postgres does not require this, but the SQL
467  * spec does, and some apps depend on it.  Therefore we avoid choosing
468  * default names that so conflict.
469  *
470  * name1, name2, and label are used the same way as for makeObjectName(),
471  * except that the label can't be NULL; digits will be appended to the label
472  * if needed to create a name that is unique within the specified namespace.
473  *
474  * 'others' can be a list of string names already chosen within the current
475  * command (but not yet reflected into the catalogs); we will not choose
476  * a duplicate of one of these either.
477  *
478  * Note: it is theoretically possible to get a collision anyway, if someone
479  * else chooses the same name concurrently.  This is fairly unlikely to be
480  * a problem in practice, especially if one is holding an exclusive lock on
481  * the relation identified by name1.
482  *
483  * Returns a palloc'd string.
484  */
485 char *
ChooseConstraintName(const char * name1,const char * name2,const char * label,Oid namespaceid,List * others)486 ChooseConstraintName(const char *name1, const char *name2,
487 					 const char *label, Oid namespaceid,
488 					 List *others)
489 {
490 	int			pass = 0;
491 	char	   *conname = NULL;
492 	char		modlabel[NAMEDATALEN];
493 	Relation	conDesc;
494 	SysScanDesc conscan;
495 	ScanKeyData skey[2];
496 	bool		found;
497 	ListCell   *l;
498 
499 	conDesc = table_open(ConstraintRelationId, AccessShareLock);
500 
501 	/* try the unmodified label first */
502 	StrNCpy(modlabel, label, sizeof(modlabel));
503 
504 	for (;;)
505 	{
506 		conname = makeObjectName(name1, name2, modlabel);
507 
508 		found = false;
509 
510 		foreach(l, others)
511 		{
512 			if (strcmp((char *) lfirst(l), conname) == 0)
513 			{
514 				found = true;
515 				break;
516 			}
517 		}
518 
519 		if (!found)
520 		{
521 			ScanKeyInit(&skey[0],
522 						Anum_pg_constraint_conname,
523 						BTEqualStrategyNumber, F_NAMEEQ,
524 						CStringGetDatum(conname));
525 
526 			ScanKeyInit(&skey[1],
527 						Anum_pg_constraint_connamespace,
528 						BTEqualStrategyNumber, F_OIDEQ,
529 						ObjectIdGetDatum(namespaceid));
530 
531 			conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
532 										 NULL, 2, skey);
533 
534 			found = (HeapTupleIsValid(systable_getnext(conscan)));
535 
536 			systable_endscan(conscan);
537 		}
538 
539 		if (!found)
540 			break;
541 
542 		/* found a conflict, so try a new name component */
543 		pfree(conname);
544 		snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
545 	}
546 
547 	table_close(conDesc, AccessShareLock);
548 
549 	return conname;
550 }
551 
552 /*
553  * Delete a single constraint record.
554  */
555 void
RemoveConstraintById(Oid conId)556 RemoveConstraintById(Oid conId)
557 {
558 	Relation	conDesc;
559 	HeapTuple	tup;
560 	Form_pg_constraint con;
561 
562 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
563 
564 	tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
565 	if (!HeapTupleIsValid(tup)) /* should not happen */
566 		elog(ERROR, "cache lookup failed for constraint %u", conId);
567 	con = (Form_pg_constraint) GETSTRUCT(tup);
568 
569 	/*
570 	 * Special processing depending on what the constraint is for.
571 	 */
572 	if (OidIsValid(con->conrelid))
573 	{
574 		Relation	rel;
575 
576 		/*
577 		 * If the constraint is for a relation, open and exclusive-lock the
578 		 * relation it's for.
579 		 */
580 		rel = table_open(con->conrelid, AccessExclusiveLock);
581 
582 		/*
583 		 * We need to update the relcheck count if it is a check constraint
584 		 * being dropped.  This update will force backends to rebuild relcache
585 		 * entries when we commit.
586 		 */
587 		if (con->contype == CONSTRAINT_CHECK)
588 		{
589 			Relation	pgrel;
590 			HeapTuple	relTup;
591 			Form_pg_class classForm;
592 
593 			pgrel = table_open(RelationRelationId, RowExclusiveLock);
594 			relTup = SearchSysCacheCopy1(RELOID,
595 										 ObjectIdGetDatum(con->conrelid));
596 			if (!HeapTupleIsValid(relTup))
597 				elog(ERROR, "cache lookup failed for relation %u",
598 					 con->conrelid);
599 			classForm = (Form_pg_class) GETSTRUCT(relTup);
600 
601 			if (classForm->relchecks == 0)	/* should not happen */
602 				elog(ERROR, "relation \"%s\" has relchecks = 0",
603 					 RelationGetRelationName(rel));
604 			classForm->relchecks--;
605 
606 			CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
607 
608 			heap_freetuple(relTup);
609 
610 			table_close(pgrel, RowExclusiveLock);
611 		}
612 
613 		/* Keep lock on constraint's rel until end of xact */
614 		table_close(rel, NoLock);
615 	}
616 	else if (OidIsValid(con->contypid))
617 	{
618 		/*
619 		 * XXX for now, do nothing special when dropping a domain constraint
620 		 *
621 		 * Probably there should be some form of locking on the domain type,
622 		 * but we have no such concept at the moment.
623 		 */
624 	}
625 	else
626 		elog(ERROR, "constraint %u is not of a known type", conId);
627 
628 	/* Fry the constraint itself */
629 	CatalogTupleDelete(conDesc, &tup->t_self);
630 
631 	/* Clean up */
632 	ReleaseSysCache(tup);
633 	table_close(conDesc, RowExclusiveLock);
634 }
635 
636 /*
637  * RenameConstraintById
638  *		Rename a constraint.
639  *
640  * Note: this isn't intended to be a user-exposed function; it doesn't check
641  * permissions etc.  Currently this is only invoked when renaming an index
642  * that is associated with a constraint, but it's made a little more general
643  * than that with the expectation of someday having ALTER TABLE RENAME
644  * CONSTRAINT.
645  */
646 void
RenameConstraintById(Oid conId,const char * newname)647 RenameConstraintById(Oid conId, const char *newname)
648 {
649 	Relation	conDesc;
650 	HeapTuple	tuple;
651 	Form_pg_constraint con;
652 
653 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
654 
655 	tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
656 	if (!HeapTupleIsValid(tuple))
657 		elog(ERROR, "cache lookup failed for constraint %u", conId);
658 	con = (Form_pg_constraint) GETSTRUCT(tuple);
659 
660 	/*
661 	 * For user-friendliness, check whether the name is already in use.
662 	 */
663 	if (OidIsValid(con->conrelid) &&
664 		ConstraintNameIsUsed(CONSTRAINT_RELATION,
665 							 con->conrelid,
666 							 newname))
667 		ereport(ERROR,
668 				(errcode(ERRCODE_DUPLICATE_OBJECT),
669 				 errmsg("constraint \"%s\" for relation \"%s\" already exists",
670 						newname, get_rel_name(con->conrelid))));
671 	if (OidIsValid(con->contypid) &&
672 		ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
673 							 con->contypid,
674 							 newname))
675 		ereport(ERROR,
676 				(errcode(ERRCODE_DUPLICATE_OBJECT),
677 				 errmsg("constraint \"%s\" for domain %s already exists",
678 						newname, format_type_be(con->contypid))));
679 
680 	/* OK, do the rename --- tuple is a copy, so OK to scribble on it */
681 	namestrcpy(&(con->conname), newname);
682 
683 	CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
684 
685 	InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
686 
687 	heap_freetuple(tuple);
688 	table_close(conDesc, RowExclusiveLock);
689 }
690 
691 /*
692  * AlterConstraintNamespaces
693  *		Find any constraints belonging to the specified object,
694  *		and move them to the specified new namespace.
695  *
696  * isType indicates whether the owning object is a type or a relation.
697  */
698 void
AlterConstraintNamespaces(Oid ownerId,Oid oldNspId,Oid newNspId,bool isType,ObjectAddresses * objsMoved)699 AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
700 						  Oid newNspId, bool isType, ObjectAddresses *objsMoved)
701 {
702 	Relation	conRel;
703 	ScanKeyData key[2];
704 	SysScanDesc scan;
705 	HeapTuple	tup;
706 
707 	conRel = table_open(ConstraintRelationId, RowExclusiveLock);
708 
709 	ScanKeyInit(&key[0],
710 				Anum_pg_constraint_conrelid,
711 				BTEqualStrategyNumber, F_OIDEQ,
712 				ObjectIdGetDatum(isType ? InvalidOid : ownerId));
713 	ScanKeyInit(&key[1],
714 				Anum_pg_constraint_contypid,
715 				BTEqualStrategyNumber, F_OIDEQ,
716 				ObjectIdGetDatum(isType ? ownerId : InvalidOid));
717 
718 	scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
719 							  NULL, 2, key);
720 
721 	while (HeapTupleIsValid((tup = systable_getnext(scan))))
722 	{
723 		Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
724 		ObjectAddress thisobj;
725 
726 		thisobj.classId = ConstraintRelationId;
727 		thisobj.objectId = conform->oid;
728 		thisobj.objectSubId = 0;
729 
730 		if (object_address_present(&thisobj, objsMoved))
731 			continue;
732 
733 		/* Don't update if the object is already part of the namespace */
734 		if (conform->connamespace == oldNspId && oldNspId != newNspId)
735 		{
736 			tup = heap_copytuple(tup);
737 			conform = (Form_pg_constraint) GETSTRUCT(tup);
738 
739 			conform->connamespace = newNspId;
740 
741 			CatalogTupleUpdate(conRel, &tup->t_self, tup);
742 
743 			/*
744 			 * Note: currently, the constraint will not have its own
745 			 * dependency on the namespace, so we don't need to do
746 			 * changeDependencyFor().
747 			 */
748 		}
749 
750 		InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
751 
752 		add_exact_object_address(&thisobj, objsMoved);
753 	}
754 
755 	systable_endscan(scan);
756 
757 	table_close(conRel, RowExclusiveLock);
758 }
759 
760 /*
761  * ConstraintSetParentConstraint
762  *		Set a partition's constraint as child of its parent constraint,
763  *		or remove the linkage if parentConstrId is InvalidOid.
764  *
765  * This updates the constraint's pg_constraint row to show it as inherited, and
766  * adds PARTITION dependencies to prevent the constraint from being deleted
767  * on its own.  Alternatively, reverse that.
768  */
769 void
ConstraintSetParentConstraint(Oid childConstrId,Oid parentConstrId,Oid childTableId)770 ConstraintSetParentConstraint(Oid childConstrId,
771 							  Oid parentConstrId,
772 							  Oid childTableId)
773 {
774 	Relation	constrRel;
775 	Form_pg_constraint constrForm;
776 	HeapTuple	tuple,
777 				newtup;
778 	ObjectAddress depender;
779 	ObjectAddress referenced;
780 
781 	constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
782 	tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
783 	if (!HeapTupleIsValid(tuple))
784 		elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
785 	newtup = heap_copytuple(tuple);
786 	constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
787 	if (OidIsValid(parentConstrId))
788 	{
789 		/* don't allow setting parent for a constraint that already has one */
790 		Assert(constrForm->coninhcount == 0);
791 		if (constrForm->conparentid != InvalidOid)
792 			elog(ERROR, "constraint %u already has a parent constraint",
793 				 childConstrId);
794 
795 		constrForm->conislocal = false;
796 		constrForm->coninhcount++;
797 		constrForm->conparentid = parentConstrId;
798 
799 		CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
800 
801 		ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
802 
803 		ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
804 		recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
805 
806 		ObjectAddressSet(referenced, RelationRelationId, childTableId);
807 		recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
808 	}
809 	else
810 	{
811 		constrForm->coninhcount--;
812 		constrForm->conislocal = true;
813 		constrForm->conparentid = InvalidOid;
814 
815 		/* Make sure there's no further inheritance. */
816 		Assert(constrForm->coninhcount == 0);
817 
818 		CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
819 
820 		deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
821 										ConstraintRelationId,
822 										DEPENDENCY_PARTITION_PRI);
823 		deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
824 										RelationRelationId,
825 										DEPENDENCY_PARTITION_SEC);
826 	}
827 
828 	ReleaseSysCache(tuple);
829 	table_close(constrRel, RowExclusiveLock);
830 }
831 
832 
833 /*
834  * get_relation_constraint_oid
835  *		Find a constraint on the specified relation with the specified name.
836  *		Returns constraint's OID.
837  */
838 Oid
get_relation_constraint_oid(Oid relid,const char * conname,bool missing_ok)839 get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
840 {
841 	Relation	pg_constraint;
842 	HeapTuple	tuple;
843 	SysScanDesc scan;
844 	ScanKeyData skey[3];
845 	Oid			conOid = InvalidOid;
846 
847 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
848 
849 	ScanKeyInit(&skey[0],
850 				Anum_pg_constraint_conrelid,
851 				BTEqualStrategyNumber, F_OIDEQ,
852 				ObjectIdGetDatum(relid));
853 	ScanKeyInit(&skey[1],
854 				Anum_pg_constraint_contypid,
855 				BTEqualStrategyNumber, F_OIDEQ,
856 				ObjectIdGetDatum(InvalidOid));
857 	ScanKeyInit(&skey[2],
858 				Anum_pg_constraint_conname,
859 				BTEqualStrategyNumber, F_NAMEEQ,
860 				CStringGetDatum(conname));
861 
862 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
863 							  NULL, 3, skey);
864 
865 	/* There can be at most one matching row */
866 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
867 		conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
868 
869 	systable_endscan(scan);
870 
871 	/* If no such constraint exists, complain */
872 	if (!OidIsValid(conOid) && !missing_ok)
873 		ereport(ERROR,
874 				(errcode(ERRCODE_UNDEFINED_OBJECT),
875 				 errmsg("constraint \"%s\" for table \"%s\" does not exist",
876 						conname, get_rel_name(relid))));
877 
878 	table_close(pg_constraint, AccessShareLock);
879 
880 	return conOid;
881 }
882 
883 /*
884  * get_relation_constraint_attnos
885  *		Find a constraint on the specified relation with the specified name
886  *		and return the constrained columns.
887  *
888  * Returns a Bitmapset of the column attnos of the constrained columns, with
889  * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
890  * columns can be represented.
891  *
892  * *constraintOid is set to the OID of the constraint, or InvalidOid on
893  * failure.
894  */
895 Bitmapset *
get_relation_constraint_attnos(Oid relid,const char * conname,bool missing_ok,Oid * constraintOid)896 get_relation_constraint_attnos(Oid relid, const char *conname,
897 							   bool missing_ok, Oid *constraintOid)
898 {
899 	Bitmapset  *conattnos = NULL;
900 	Relation	pg_constraint;
901 	HeapTuple	tuple;
902 	SysScanDesc scan;
903 	ScanKeyData skey[3];
904 
905 	/* Set *constraintOid, to avoid complaints about uninitialized vars */
906 	*constraintOid = InvalidOid;
907 
908 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
909 
910 	ScanKeyInit(&skey[0],
911 				Anum_pg_constraint_conrelid,
912 				BTEqualStrategyNumber, F_OIDEQ,
913 				ObjectIdGetDatum(relid));
914 	ScanKeyInit(&skey[1],
915 				Anum_pg_constraint_contypid,
916 				BTEqualStrategyNumber, F_OIDEQ,
917 				ObjectIdGetDatum(InvalidOid));
918 	ScanKeyInit(&skey[2],
919 				Anum_pg_constraint_conname,
920 				BTEqualStrategyNumber, F_NAMEEQ,
921 				CStringGetDatum(conname));
922 
923 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
924 							  NULL, 3, skey);
925 
926 	/* There can be at most one matching row */
927 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
928 	{
929 		Datum		adatum;
930 		bool		isNull;
931 
932 		*constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
933 
934 		/* Extract the conkey array, ie, attnums of constrained columns */
935 		adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
936 							  RelationGetDescr(pg_constraint), &isNull);
937 		if (!isNull)
938 		{
939 			ArrayType  *arr;
940 			int			numcols;
941 			int16	   *attnums;
942 			int			i;
943 
944 			arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
945 			numcols = ARR_DIMS(arr)[0];
946 			if (ARR_NDIM(arr) != 1 ||
947 				numcols < 0 ||
948 				ARR_HASNULL(arr) ||
949 				ARR_ELEMTYPE(arr) != INT2OID)
950 				elog(ERROR, "conkey is not a 1-D smallint array");
951 			attnums = (int16 *) ARR_DATA_PTR(arr);
952 
953 			/* Construct the result value */
954 			for (i = 0; i < numcols; i++)
955 			{
956 				conattnos = bms_add_member(conattnos,
957 										   attnums[i] - FirstLowInvalidHeapAttributeNumber);
958 			}
959 		}
960 	}
961 
962 	systable_endscan(scan);
963 
964 	/* If no such constraint exists, complain */
965 	if (!OidIsValid(*constraintOid) && !missing_ok)
966 		ereport(ERROR,
967 				(errcode(ERRCODE_UNDEFINED_OBJECT),
968 				 errmsg("constraint \"%s\" for table \"%s\" does not exist",
969 						conname, get_rel_name(relid))));
970 
971 	table_close(pg_constraint, AccessShareLock);
972 
973 	return conattnos;
974 }
975 
976 /*
977  * Return the OID of the constraint associated with the given index in the
978  * given relation; or InvalidOid if no such index is catalogued.
979  */
980 Oid
get_relation_idx_constraint_oid(Oid relationId,Oid indexId)981 get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
982 {
983 	Relation	pg_constraint;
984 	SysScanDesc scan;
985 	ScanKeyData key;
986 	HeapTuple	tuple;
987 	Oid			constraintId = InvalidOid;
988 
989 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
990 
991 	ScanKeyInit(&key,
992 				Anum_pg_constraint_conrelid,
993 				BTEqualStrategyNumber,
994 				F_OIDEQ,
995 				ObjectIdGetDatum(relationId));
996 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
997 							  true, NULL, 1, &key);
998 	while ((tuple = systable_getnext(scan)) != NULL)
999 	{
1000 		Form_pg_constraint constrForm;
1001 
1002 		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
1003 		if (constrForm->conindid == indexId)
1004 		{
1005 			constraintId = constrForm->oid;
1006 			break;
1007 		}
1008 	}
1009 	systable_endscan(scan);
1010 
1011 	table_close(pg_constraint, AccessShareLock);
1012 	return constraintId;
1013 }
1014 
1015 /*
1016  * get_domain_constraint_oid
1017  *		Find a constraint on the specified domain with the specified name.
1018  *		Returns constraint's OID.
1019  */
1020 Oid
get_domain_constraint_oid(Oid typid,const char * conname,bool missing_ok)1021 get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1022 {
1023 	Relation	pg_constraint;
1024 	HeapTuple	tuple;
1025 	SysScanDesc scan;
1026 	ScanKeyData skey[3];
1027 	Oid			conOid = InvalidOid;
1028 
1029 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1030 
1031 	ScanKeyInit(&skey[0],
1032 				Anum_pg_constraint_conrelid,
1033 				BTEqualStrategyNumber, F_OIDEQ,
1034 				ObjectIdGetDatum(InvalidOid));
1035 	ScanKeyInit(&skey[1],
1036 				Anum_pg_constraint_contypid,
1037 				BTEqualStrategyNumber, F_OIDEQ,
1038 				ObjectIdGetDatum(typid));
1039 	ScanKeyInit(&skey[2],
1040 				Anum_pg_constraint_conname,
1041 				BTEqualStrategyNumber, F_NAMEEQ,
1042 				CStringGetDatum(conname));
1043 
1044 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1045 							  NULL, 3, skey);
1046 
1047 	/* There can be at most one matching row */
1048 	if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1049 		conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1050 
1051 	systable_endscan(scan);
1052 
1053 	/* If no such constraint exists, complain */
1054 	if (!OidIsValid(conOid) && !missing_ok)
1055 		ereport(ERROR,
1056 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1057 				 errmsg("constraint \"%s\" for domain %s does not exist",
1058 						conname, format_type_be(typid))));
1059 
1060 	table_close(pg_constraint, AccessShareLock);
1061 
1062 	return conOid;
1063 }
1064 
1065 /*
1066  * get_primary_key_attnos
1067  *		Identify the columns in a relation's primary key, if any.
1068  *
1069  * Returns a Bitmapset of the column attnos of the primary key's columns,
1070  * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1071  * system columns can be represented.
1072  *
1073  * If there is no primary key, return NULL.  We also return NULL if the pkey
1074  * constraint is deferrable and deferrableOk is false.
1075  *
1076  * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1077  * on failure.
1078  */
1079 Bitmapset *
get_primary_key_attnos(Oid relid,bool deferrableOk,Oid * constraintOid)1080 get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1081 {
1082 	Bitmapset  *pkattnos = NULL;
1083 	Relation	pg_constraint;
1084 	HeapTuple	tuple;
1085 	SysScanDesc scan;
1086 	ScanKeyData skey[1];
1087 
1088 	/* Set *constraintOid, to avoid complaints about uninitialized vars */
1089 	*constraintOid = InvalidOid;
1090 
1091 	/* Scan pg_constraint for constraints of the target rel */
1092 	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1093 
1094 	ScanKeyInit(&skey[0],
1095 				Anum_pg_constraint_conrelid,
1096 				BTEqualStrategyNumber, F_OIDEQ,
1097 				ObjectIdGetDatum(relid));
1098 
1099 	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1100 							  NULL, 1, skey);
1101 
1102 	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1103 	{
1104 		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1105 		Datum		adatum;
1106 		bool		isNull;
1107 		ArrayType  *arr;
1108 		int16	   *attnums;
1109 		int			numkeys;
1110 		int			i;
1111 
1112 		/* Skip constraints that are not PRIMARY KEYs */
1113 		if (con->contype != CONSTRAINT_PRIMARY)
1114 			continue;
1115 
1116 		/*
1117 		 * If the primary key is deferrable, but we've been instructed to
1118 		 * ignore deferrable constraints, then we might as well give up
1119 		 * searching, since there can only be a single primary key on a table.
1120 		 */
1121 		if (con->condeferrable && !deferrableOk)
1122 			break;
1123 
1124 		/* Extract the conkey array, ie, attnums of PK's columns */
1125 		adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1126 							  RelationGetDescr(pg_constraint), &isNull);
1127 		if (isNull)
1128 			elog(ERROR, "null conkey for constraint %u",
1129 				 ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1130 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1131 		numkeys = ARR_DIMS(arr)[0];
1132 		if (ARR_NDIM(arr) != 1 ||
1133 			numkeys < 0 ||
1134 			ARR_HASNULL(arr) ||
1135 			ARR_ELEMTYPE(arr) != INT2OID)
1136 			elog(ERROR, "conkey is not a 1-D smallint array");
1137 		attnums = (int16 *) ARR_DATA_PTR(arr);
1138 
1139 		/* Construct the result value */
1140 		for (i = 0; i < numkeys; i++)
1141 		{
1142 			pkattnos = bms_add_member(pkattnos,
1143 									  attnums[i] - FirstLowInvalidHeapAttributeNumber);
1144 		}
1145 		*constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1146 
1147 		/* No need to search further */
1148 		break;
1149 	}
1150 
1151 	systable_endscan(scan);
1152 
1153 	table_close(pg_constraint, AccessShareLock);
1154 
1155 	return pkattnos;
1156 }
1157 
1158 /*
1159  * Extract data from the pg_constraint tuple of a foreign-key constraint.
1160  *
1161  * All arguments save the first are output arguments; the last three of them
1162  * can be passed as NULL if caller doesn't need them.
1163  */
1164 void
DeconstructFkConstraintRow(HeapTuple tuple,int * numfks,AttrNumber * conkey,AttrNumber * confkey,Oid * pf_eq_oprs,Oid * pp_eq_oprs,Oid * ff_eq_oprs)1165 DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1166 						   AttrNumber *conkey, AttrNumber *confkey,
1167 						   Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
1168 {
1169 	Oid			constrId;
1170 	Datum		adatum;
1171 	bool		isNull;
1172 	ArrayType  *arr;
1173 	int			numkeys;
1174 
1175 	constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1176 
1177 	/*
1178 	 * We expect the arrays to be 1-D arrays of the right types; verify that.
1179 	 * We don't need to use deconstruct_array() since the array data is just
1180 	 * going to look like a C array of values.
1181 	 */
1182 	adatum = SysCacheGetAttr(CONSTROID, tuple,
1183 							 Anum_pg_constraint_conkey, &isNull);
1184 	if (isNull)
1185 		elog(ERROR, "null conkey for constraint %u", constrId);
1186 	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1187 	if (ARR_NDIM(arr) != 1 ||
1188 		ARR_HASNULL(arr) ||
1189 		ARR_ELEMTYPE(arr) != INT2OID)
1190 		elog(ERROR, "conkey is not a 1-D smallint array");
1191 	numkeys = ARR_DIMS(arr)[0];
1192 	if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1193 		elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1194 	memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1195 	if ((Pointer) arr != DatumGetPointer(adatum))
1196 		pfree(arr);				/* free de-toasted copy, if any */
1197 
1198 	adatum = SysCacheGetAttr(CONSTROID, tuple,
1199 							 Anum_pg_constraint_confkey, &isNull);
1200 	if (isNull)
1201 		elog(ERROR, "null confkey for constraint %u", constrId);
1202 	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1203 	if (ARR_NDIM(arr) != 1 ||
1204 		ARR_DIMS(arr)[0] != numkeys ||
1205 		ARR_HASNULL(arr) ||
1206 		ARR_ELEMTYPE(arr) != INT2OID)
1207 		elog(ERROR, "confkey is not a 1-D smallint array");
1208 	memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1209 	if ((Pointer) arr != DatumGetPointer(adatum))
1210 		pfree(arr);				/* free de-toasted copy, if any */
1211 
1212 	if (pf_eq_oprs)
1213 	{
1214 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1215 								 Anum_pg_constraint_conpfeqop, &isNull);
1216 		if (isNull)
1217 			elog(ERROR, "null conpfeqop for constraint %u", constrId);
1218 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1219 		/* see TryReuseForeignKey if you change the test below */
1220 		if (ARR_NDIM(arr) != 1 ||
1221 			ARR_DIMS(arr)[0] != numkeys ||
1222 			ARR_HASNULL(arr) ||
1223 			ARR_ELEMTYPE(arr) != OIDOID)
1224 			elog(ERROR, "conpfeqop is not a 1-D Oid array");
1225 		memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1226 		if ((Pointer) arr != DatumGetPointer(adatum))
1227 			pfree(arr);			/* free de-toasted copy, if any */
1228 	}
1229 
1230 	if (pp_eq_oprs)
1231 	{
1232 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1233 								 Anum_pg_constraint_conppeqop, &isNull);
1234 		if (isNull)
1235 			elog(ERROR, "null conppeqop for constraint %u", constrId);
1236 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1237 		if (ARR_NDIM(arr) != 1 ||
1238 			ARR_DIMS(arr)[0] != numkeys ||
1239 			ARR_HASNULL(arr) ||
1240 			ARR_ELEMTYPE(arr) != OIDOID)
1241 			elog(ERROR, "conppeqop is not a 1-D Oid array");
1242 		memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1243 		if ((Pointer) arr != DatumGetPointer(adatum))
1244 			pfree(arr);			/* free de-toasted copy, if any */
1245 	}
1246 
1247 	if (ff_eq_oprs)
1248 	{
1249 		adatum = SysCacheGetAttr(CONSTROID, tuple,
1250 								 Anum_pg_constraint_conffeqop, &isNull);
1251 		if (isNull)
1252 			elog(ERROR, "null conffeqop for constraint %u", constrId);
1253 		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
1254 		if (ARR_NDIM(arr) != 1 ||
1255 			ARR_DIMS(arr)[0] != numkeys ||
1256 			ARR_HASNULL(arr) ||
1257 			ARR_ELEMTYPE(arr) != OIDOID)
1258 			elog(ERROR, "conffeqop is not a 1-D Oid array");
1259 		memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1260 		if ((Pointer) arr != DatumGetPointer(adatum))
1261 			pfree(arr);			/* free de-toasted copy, if any */
1262 	}
1263 
1264 	*numfks = numkeys;
1265 }
1266 
1267 /*
1268  * Determine whether a relation can be proven functionally dependent on
1269  * a set of grouping columns.  If so, return true and add the pg_constraint
1270  * OIDs of the constraints needed for the proof to the *constraintDeps list.
1271  *
1272  * grouping_columns is a list of grouping expressions, in which columns of
1273  * the rel of interest are Vars with the indicated varno/varlevelsup.
1274  *
1275  * Currently we only check to see if the rel has a primary key that is a
1276  * subset of the grouping_columns.  We could also use plain unique constraints
1277  * if all their columns are known not null, but there's a problem: we need
1278  * to be able to represent the not-null-ness as part of the constraints added
1279  * to *constraintDeps.  FIXME whenever not-null constraints get represented
1280  * in pg_constraint.
1281  */
1282 bool
check_functional_grouping(Oid relid,Index varno,Index varlevelsup,List * grouping_columns,List ** constraintDeps)1283 check_functional_grouping(Oid relid,
1284 						  Index varno, Index varlevelsup,
1285 						  List *grouping_columns,
1286 						  List **constraintDeps)
1287 {
1288 	Bitmapset  *pkattnos;
1289 	Bitmapset  *groupbyattnos;
1290 	Oid			constraintOid;
1291 	ListCell   *gl;
1292 
1293 	/* If the rel has no PK, then we can't prove functional dependency */
1294 	pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1295 	if (pkattnos == NULL)
1296 		return false;
1297 
1298 	/* Identify all the rel's columns that appear in grouping_columns */
1299 	groupbyattnos = NULL;
1300 	foreach(gl, grouping_columns)
1301 	{
1302 		Var		   *gvar = (Var *) lfirst(gl);
1303 
1304 		if (IsA(gvar, Var) &&
1305 			gvar->varno == varno &&
1306 			gvar->varlevelsup == varlevelsup)
1307 			groupbyattnos = bms_add_member(groupbyattnos,
1308 										   gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1309 	}
1310 
1311 	if (bms_is_subset(pkattnos, groupbyattnos))
1312 	{
1313 		/* The PK is a subset of grouping_columns, so we win */
1314 		*constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1315 		return true;
1316 	}
1317 
1318 	return false;
1319 }
1320