1 /*-------------------------------------------------------------------------
2  *
3  * ri_triggers.c
4  *
5  *	Generic trigger procedures for referential integrity constraint
6  *	checks.
7  *
8  *	Note about memory management: the private hashtables kept here live
9  *	across query and transaction boundaries, in fact they live as long as
10  *	the backend does.  This works because the hashtable structures
11  *	themselves are allocated by dynahash.c in its permanent DynaHashCxt,
12  *	and the SPI plans they point to are saved using SPI_keepplan().
13  *	There is not currently any provision for throwing away a no-longer-needed
14  *	plan --- consider improving this someday.
15  *
16  *
17  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
18  *
19  * src/backend/utils/adt/ri_triggers.c
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "access/htup_details.h"
27 #include "access/sysattr.h"
28 #include "access/table.h"
29 #include "access/tableam.h"
30 #include "access/xact.h"
31 #include "catalog/pg_collation.h"
32 #include "catalog/pg_constraint.h"
33 #include "catalog/pg_operator.h"
34 #include "catalog/pg_type.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
37 #include "executor/spi.h"
38 #include "lib/ilist.h"
39 #include "miscadmin.h"
40 #include "parser/parse_coerce.h"
41 #include "parser/parse_relation.h"
42 #include "storage/bufmgr.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/datum.h"
46 #include "utils/fmgroids.h"
47 #include "utils/guc.h"
48 #include "utils/inval.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/rel.h"
52 #include "utils/rls.h"
53 #include "utils/ruleutils.h"
54 #include "utils/snapmgr.h"
55 #include "utils/syscache.h"
56 
57 /*
58  * Local definitions
59  */
60 
61 #define RI_MAX_NUMKEYS					INDEX_MAX_KEYS
62 
63 #define RI_INIT_CONSTRAINTHASHSIZE		64
64 #define RI_INIT_QUERYHASHSIZE			(RI_INIT_CONSTRAINTHASHSIZE * 4)
65 
66 #define RI_KEYS_ALL_NULL				0
67 #define RI_KEYS_SOME_NULL				1
68 #define RI_KEYS_NONE_NULL				2
69 
70 /* RI query type codes */
71 /* these queries are executed against the PK (referenced) table: */
72 #define RI_PLAN_CHECK_LOOKUPPK			1
73 #define RI_PLAN_CHECK_LOOKUPPK_FROM_PK	2
74 #define RI_PLAN_LAST_ON_PK				RI_PLAN_CHECK_LOOKUPPK_FROM_PK
75 /* these queries are executed against the FK (referencing) table: */
76 #define RI_PLAN_CASCADE_DEL_DODELETE	3
77 #define RI_PLAN_CASCADE_UPD_DOUPDATE	4
78 #define RI_PLAN_RESTRICT_CHECKREF		5
79 #define RI_PLAN_SETNULL_DOUPDATE		6
80 #define RI_PLAN_SETDEFAULT_DOUPDATE		7
81 
82 #define MAX_QUOTED_NAME_LEN  (NAMEDATALEN*2+3)
83 #define MAX_QUOTED_REL_NAME_LEN  (MAX_QUOTED_NAME_LEN*2)
84 
85 #define RIAttName(rel, attnum)	NameStr(*attnumAttName(rel, attnum))
86 #define RIAttType(rel, attnum)	attnumTypeId(rel, attnum)
87 #define RIAttCollation(rel, attnum) attnumCollationId(rel, attnum)
88 
89 #define RI_TRIGTYPE_INSERT 1
90 #define RI_TRIGTYPE_UPDATE 2
91 #define RI_TRIGTYPE_DELETE 3
92 
93 
94 /*
95  * RI_ConstraintInfo
96  *
97  * Information extracted from an FK pg_constraint entry.  This is cached in
98  * ri_constraint_cache.
99  */
100 typedef struct RI_ConstraintInfo
101 {
102 	Oid			constraint_id;	/* OID of pg_constraint entry (hash key) */
103 	bool		valid;			/* successfully initialized? */
104 	uint32		oidHashValue;	/* hash value of pg_constraint OID */
105 	NameData	conname;		/* name of the FK constraint */
106 	Oid			pk_relid;		/* referenced relation */
107 	Oid			fk_relid;		/* referencing relation */
108 	char		confupdtype;	/* foreign key's ON UPDATE action */
109 	char		confdeltype;	/* foreign key's ON DELETE action */
110 	char		confmatchtype;	/* foreign key's match type */
111 	int			nkeys;			/* number of key columns */
112 	int16		pk_attnums[RI_MAX_NUMKEYS]; /* attnums of referenced cols */
113 	int16		fk_attnums[RI_MAX_NUMKEYS]; /* attnums of referencing cols */
114 	Oid			pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = FK) */
115 	Oid			pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = PK) */
116 	Oid			ff_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (FK = FK) */
117 	dlist_node	valid_link;		/* Link in list of valid entries */
118 } RI_ConstraintInfo;
119 
120 /*
121  * RI_QueryKey
122  *
123  * The key identifying a prepared SPI plan in our query hashtable
124  */
125 typedef struct RI_QueryKey
126 {
127 	Oid			constr_id;		/* OID of pg_constraint entry */
128 	int32		constr_queryno; /* query type ID, see RI_PLAN_XXX above */
129 } RI_QueryKey;
130 
131 /*
132  * RI_QueryHashEntry
133  */
134 typedef struct RI_QueryHashEntry
135 {
136 	RI_QueryKey key;
137 	SPIPlanPtr	plan;
138 } RI_QueryHashEntry;
139 
140 /*
141  * RI_CompareKey
142  *
143  * The key identifying an entry showing how to compare two values
144  */
145 typedef struct RI_CompareKey
146 {
147 	Oid			eq_opr;			/* the equality operator to apply */
148 	Oid			typeid;			/* the data type to apply it to */
149 } RI_CompareKey;
150 
151 /*
152  * RI_CompareHashEntry
153  */
154 typedef struct RI_CompareHashEntry
155 {
156 	RI_CompareKey key;
157 	bool		valid;			/* successfully initialized? */
158 	FmgrInfo	eq_opr_finfo;	/* call info for equality fn */
159 	FmgrInfo	cast_func_finfo;	/* in case we must coerce input */
160 } RI_CompareHashEntry;
161 
162 
163 /*
164  * Local data
165  */
166 static HTAB *ri_constraint_cache = NULL;
167 static HTAB *ri_query_cache = NULL;
168 static HTAB *ri_compare_cache = NULL;
169 static dlist_head ri_constraint_cache_valid_list;
170 static int	ri_constraint_cache_valid_count = 0;
171 
172 
173 /*
174  * Local function prototypes
175  */
176 static bool ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
177 							  TupleTableSlot *oldslot,
178 							  const RI_ConstraintInfo *riinfo);
179 static Datum ri_restrict(TriggerData *trigdata, bool is_no_action);
180 static Datum ri_set(TriggerData *trigdata, bool is_set_null);
181 static void quoteOneName(char *buffer, const char *name);
182 static void quoteRelationName(char *buffer, Relation rel);
183 static void ri_GenerateQual(StringInfo buf,
184 							const char *sep,
185 							const char *leftop, Oid leftoptype,
186 							Oid opoid,
187 							const char *rightop, Oid rightoptype);
188 static void ri_GenerateQualCollation(StringInfo buf, Oid collation);
189 static int	ri_NullCheck(TupleDesc tupdesc, TupleTableSlot *slot,
190 						 const RI_ConstraintInfo *riinfo, bool rel_is_pk);
191 static void ri_BuildQueryKey(RI_QueryKey *key,
192 							 const RI_ConstraintInfo *riinfo,
193 							 int32 constr_queryno);
194 static bool ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
195 						 const RI_ConstraintInfo *riinfo, bool rel_is_pk);
196 static bool ri_AttributesEqual(Oid eq_opr, Oid typeid,
197 							   Datum oldvalue, Datum newvalue);
198 
199 static void ri_InitHashTables(void);
200 static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue);
201 static SPIPlanPtr ri_FetchPreparedPlan(RI_QueryKey *key);
202 static void ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan);
203 static RI_CompareHashEntry *ri_HashCompareOp(Oid eq_opr, Oid typeid);
204 
205 static void ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname,
206 							int tgkind);
207 static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
208 													   Relation trig_rel, bool rel_is_pk);
209 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
210 static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
211 							   RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
212 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
213 							RI_QueryKey *qkey, SPIPlanPtr qplan,
214 							Relation fk_rel, Relation pk_rel,
215 							TupleTableSlot *oldslot, TupleTableSlot *newslot,
216 							bool detectNewRows, int expect_OK);
217 static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
218 							 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
219 							 Datum *vals, char *nulls);
220 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
221 							   Relation pk_rel, Relation fk_rel,
222 							   TupleTableSlot *violatorslot, TupleDesc tupdesc,
223 							   int queryno, bool partgone) pg_attribute_noreturn();
224 
225 
226 /*
227  * RI_FKey_check -
228  *
229  * Check foreign key existence (combined for INSERT and UPDATE).
230  */
231 static Datum
RI_FKey_check(TriggerData * trigdata)232 RI_FKey_check(TriggerData *trigdata)
233 {
234 	const RI_ConstraintInfo *riinfo;
235 	Relation	fk_rel;
236 	Relation	pk_rel;
237 	TupleTableSlot *newslot;
238 	RI_QueryKey qkey;
239 	SPIPlanPtr	qplan;
240 
241 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
242 									trigdata->tg_relation, false);
243 
244 	if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
245 		newslot = trigdata->tg_newslot;
246 	else
247 		newslot = trigdata->tg_trigslot;
248 
249 	/*
250 	 * We should not even consider checking the row if it is no longer valid,
251 	 * since it was either deleted (so the deferred check should be skipped)
252 	 * or updated (in which case only the latest version of the row should be
253 	 * checked).  Test its liveness according to SnapshotSelf.  We need pin
254 	 * and lock on the buffer to call HeapTupleSatisfiesVisibility.  Caller
255 	 * should be holding pin, but not lock.
256 	 */
257 	if (!table_tuple_satisfies_snapshot(trigdata->tg_relation, newslot, SnapshotSelf))
258 		return PointerGetDatum(NULL);
259 
260 	/*
261 	 * Get the relation descriptors of the FK and PK tables.
262 	 *
263 	 * pk_rel is opened in RowShareLock mode since that's what our eventual
264 	 * SELECT FOR KEY SHARE will get on it.
265 	 */
266 	fk_rel = trigdata->tg_relation;
267 	pk_rel = table_open(riinfo->pk_relid, RowShareLock);
268 
269 	switch (ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false))
270 	{
271 		case RI_KEYS_ALL_NULL:
272 
273 			/*
274 			 * No further check needed - an all-NULL key passes every type of
275 			 * foreign key constraint.
276 			 */
277 			table_close(pk_rel, RowShareLock);
278 			return PointerGetDatum(NULL);
279 
280 		case RI_KEYS_SOME_NULL:
281 
282 			/*
283 			 * This is the only case that differs between the three kinds of
284 			 * MATCH.
285 			 */
286 			switch (riinfo->confmatchtype)
287 			{
288 				case FKCONSTR_MATCH_FULL:
289 
290 					/*
291 					 * Not allowed - MATCH FULL says either all or none of the
292 					 * attributes can be NULLs
293 					 */
294 					ereport(ERROR,
295 							(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
296 							 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
297 									RelationGetRelationName(fk_rel),
298 									NameStr(riinfo->conname)),
299 							 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
300 							 errtableconstraint(fk_rel,
301 												NameStr(riinfo->conname))));
302 					table_close(pk_rel, RowShareLock);
303 					return PointerGetDatum(NULL);
304 
305 				case FKCONSTR_MATCH_SIMPLE:
306 
307 					/*
308 					 * MATCH SIMPLE - if ANY column is null, the key passes
309 					 * the constraint.
310 					 */
311 					table_close(pk_rel, RowShareLock);
312 					return PointerGetDatum(NULL);
313 
314 #ifdef NOT_USED
315 				case FKCONSTR_MATCH_PARTIAL:
316 
317 					/*
318 					 * MATCH PARTIAL - all non-null columns must match. (not
319 					 * implemented, can be done by modifying the query below
320 					 * to only include non-null columns, or by writing a
321 					 * special version here)
322 					 */
323 					break;
324 #endif
325 			}
326 
327 		case RI_KEYS_NONE_NULL:
328 
329 			/*
330 			 * Have a full qualified key - continue below for all three kinds
331 			 * of MATCH.
332 			 */
333 			break;
334 	}
335 
336 	if (SPI_connect() != SPI_OK_CONNECT)
337 		elog(ERROR, "SPI_connect failed");
338 
339 	/* Fetch or prepare a saved plan for the real check */
340 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
341 
342 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
343 	{
344 		StringInfoData querybuf;
345 		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
346 		char		attname[MAX_QUOTED_NAME_LEN];
347 		char		paramname[16];
348 		const char *querysep;
349 		Oid			queryoids[RI_MAX_NUMKEYS];
350 		const char *pk_only;
351 
352 		/* ----------
353 		 * The query string built is
354 		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
355 		 *		   FOR KEY SHARE OF x
356 		 * The type id's for the $ parameters are those of the
357 		 * corresponding FK attributes.
358 		 * ----------
359 		 */
360 		initStringInfo(&querybuf);
361 		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
362 			"" : "ONLY ";
363 		quoteRelationName(pkrelname, pk_rel);
364 		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
365 						 pk_only, pkrelname);
366 		querysep = "WHERE";
367 		for (int i = 0; i < riinfo->nkeys; i++)
368 		{
369 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
370 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
371 
372 			quoteOneName(attname,
373 						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
374 			sprintf(paramname, "$%d", i + 1);
375 			ri_GenerateQual(&querybuf, querysep,
376 							attname, pk_type,
377 							riinfo->pf_eq_oprs[i],
378 							paramname, fk_type);
379 			querysep = "AND";
380 			queryoids[i] = fk_type;
381 		}
382 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
383 
384 		/* Prepare and save the plan */
385 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
386 							 &qkey, fk_rel, pk_rel);
387 	}
388 
389 	/*
390 	 * Now check that foreign key exists in PK table
391 	 */
392 	ri_PerformCheck(riinfo, &qkey, qplan,
393 					fk_rel, pk_rel,
394 					NULL, newslot,
395 					false,
396 					SPI_OK_SELECT);
397 
398 	if (SPI_finish() != SPI_OK_FINISH)
399 		elog(ERROR, "SPI_finish failed");
400 
401 	table_close(pk_rel, RowShareLock);
402 
403 	return PointerGetDatum(NULL);
404 }
405 
406 
407 /*
408  * RI_FKey_check_ins -
409  *
410  * Check foreign key existence at insert event on FK table.
411  */
412 Datum
RI_FKey_check_ins(PG_FUNCTION_ARGS)413 RI_FKey_check_ins(PG_FUNCTION_ARGS)
414 {
415 	/* Check that this is a valid trigger call on the right time and event. */
416 	ri_CheckTrigger(fcinfo, "RI_FKey_check_ins", RI_TRIGTYPE_INSERT);
417 
418 	/* Share code with UPDATE case. */
419 	return RI_FKey_check((TriggerData *) fcinfo->context);
420 }
421 
422 
423 /*
424  * RI_FKey_check_upd -
425  *
426  * Check foreign key existence at update event on FK table.
427  */
428 Datum
RI_FKey_check_upd(PG_FUNCTION_ARGS)429 RI_FKey_check_upd(PG_FUNCTION_ARGS)
430 {
431 	/* Check that this is a valid trigger call on the right time and event. */
432 	ri_CheckTrigger(fcinfo, "RI_FKey_check_upd", RI_TRIGTYPE_UPDATE);
433 
434 	/* Share code with INSERT case. */
435 	return RI_FKey_check((TriggerData *) fcinfo->context);
436 }
437 
438 
439 /*
440  * ri_Check_Pk_Match
441  *
442  * Check to see if another PK row has been created that provides the same
443  * key values as the "oldslot" that's been modified or deleted in our trigger
444  * event.  Returns true if a match is found in the PK table.
445  *
446  * We assume the caller checked that the oldslot contains no NULL key values,
447  * since otherwise a match is impossible.
448  */
449 static bool
ri_Check_Pk_Match(Relation pk_rel,Relation fk_rel,TupleTableSlot * oldslot,const RI_ConstraintInfo * riinfo)450 ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
451 				  TupleTableSlot *oldslot,
452 				  const RI_ConstraintInfo *riinfo)
453 {
454 	SPIPlanPtr	qplan;
455 	RI_QueryKey qkey;
456 	bool		result;
457 
458 	/* Only called for non-null rows */
459 	Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
460 
461 	if (SPI_connect() != SPI_OK_CONNECT)
462 		elog(ERROR, "SPI_connect failed");
463 
464 	/*
465 	 * Fetch or prepare a saved plan for checking PK table with values coming
466 	 * from a PK row
467 	 */
468 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
469 
470 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
471 	{
472 		StringInfoData querybuf;
473 		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
474 		char		attname[MAX_QUOTED_NAME_LEN];
475 		char		paramname[16];
476 		const char *querysep;
477 		const char *pk_only;
478 		Oid			queryoids[RI_MAX_NUMKEYS];
479 
480 		/* ----------
481 		 * The query string built is
482 		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
483 		 *		   FOR KEY SHARE OF x
484 		 * The type id's for the $ parameters are those of the
485 		 * PK attributes themselves.
486 		 * ----------
487 		 */
488 		initStringInfo(&querybuf);
489 		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
490 			"" : "ONLY ";
491 		quoteRelationName(pkrelname, pk_rel);
492 		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
493 						 pk_only, pkrelname);
494 		querysep = "WHERE";
495 		for (int i = 0; i < riinfo->nkeys; i++)
496 		{
497 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
498 
499 			quoteOneName(attname,
500 						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
501 			sprintf(paramname, "$%d", i + 1);
502 			ri_GenerateQual(&querybuf, querysep,
503 							attname, pk_type,
504 							riinfo->pp_eq_oprs[i],
505 							paramname, pk_type);
506 			querysep = "AND";
507 			queryoids[i] = pk_type;
508 		}
509 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
510 
511 		/* Prepare and save the plan */
512 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
513 							 &qkey, fk_rel, pk_rel);
514 	}
515 
516 	/*
517 	 * We have a plan now. Run it.
518 	 */
519 	result = ri_PerformCheck(riinfo, &qkey, qplan,
520 							 fk_rel, pk_rel,
521 							 oldslot, NULL,
522 							 true,	/* treat like update */
523 							 SPI_OK_SELECT);
524 
525 	if (SPI_finish() != SPI_OK_FINISH)
526 		elog(ERROR, "SPI_finish failed");
527 
528 	return result;
529 }
530 
531 
532 /*
533  * RI_FKey_noaction_del -
534  *
535  * Give an error and roll back the current transaction if the
536  * delete has resulted in a violation of the given referential
537  * integrity constraint.
538  */
539 Datum
RI_FKey_noaction_del(PG_FUNCTION_ARGS)540 RI_FKey_noaction_del(PG_FUNCTION_ARGS)
541 {
542 	/* Check that this is a valid trigger call on the right time and event. */
543 	ri_CheckTrigger(fcinfo, "RI_FKey_noaction_del", RI_TRIGTYPE_DELETE);
544 
545 	/* Share code with RESTRICT/UPDATE cases. */
546 	return ri_restrict((TriggerData *) fcinfo->context, true);
547 }
548 
549 /*
550  * RI_FKey_restrict_del -
551  *
552  * Restrict delete from PK table to rows unreferenced by foreign key.
553  *
554  * The SQL standard intends that this referential action occur exactly when
555  * the delete is performed, rather than after.  This appears to be
556  * the only difference between "NO ACTION" and "RESTRICT".  In Postgres
557  * we still implement this as an AFTER trigger, but it's non-deferrable.
558  */
559 Datum
RI_FKey_restrict_del(PG_FUNCTION_ARGS)560 RI_FKey_restrict_del(PG_FUNCTION_ARGS)
561 {
562 	/* Check that this is a valid trigger call on the right time and event. */
563 	ri_CheckTrigger(fcinfo, "RI_FKey_restrict_del", RI_TRIGTYPE_DELETE);
564 
565 	/* Share code with NO ACTION/UPDATE cases. */
566 	return ri_restrict((TriggerData *) fcinfo->context, false);
567 }
568 
569 /*
570  * RI_FKey_noaction_upd -
571  *
572  * Give an error and roll back the current transaction if the
573  * update has resulted in a violation of the given referential
574  * integrity constraint.
575  */
576 Datum
RI_FKey_noaction_upd(PG_FUNCTION_ARGS)577 RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
578 {
579 	/* Check that this is a valid trigger call on the right time and event. */
580 	ri_CheckTrigger(fcinfo, "RI_FKey_noaction_upd", RI_TRIGTYPE_UPDATE);
581 
582 	/* Share code with RESTRICT/DELETE cases. */
583 	return ri_restrict((TriggerData *) fcinfo->context, true);
584 }
585 
586 /*
587  * RI_FKey_restrict_upd -
588  *
589  * Restrict update of PK to rows unreferenced by foreign key.
590  *
591  * The SQL standard intends that this referential action occur exactly when
592  * the update is performed, rather than after.  This appears to be
593  * the only difference between "NO ACTION" and "RESTRICT".  In Postgres
594  * we still implement this as an AFTER trigger, but it's non-deferrable.
595  */
596 Datum
RI_FKey_restrict_upd(PG_FUNCTION_ARGS)597 RI_FKey_restrict_upd(PG_FUNCTION_ARGS)
598 {
599 	/* Check that this is a valid trigger call on the right time and event. */
600 	ri_CheckTrigger(fcinfo, "RI_FKey_restrict_upd", RI_TRIGTYPE_UPDATE);
601 
602 	/* Share code with NO ACTION/DELETE cases. */
603 	return ri_restrict((TriggerData *) fcinfo->context, false);
604 }
605 
606 /*
607  * ri_restrict -
608  *
609  * Common code for ON DELETE RESTRICT, ON DELETE NO ACTION,
610  * ON UPDATE RESTRICT, and ON UPDATE NO ACTION.
611  */
612 static Datum
ri_restrict(TriggerData * trigdata,bool is_no_action)613 ri_restrict(TriggerData *trigdata, bool is_no_action)
614 {
615 	const RI_ConstraintInfo *riinfo;
616 	Relation	fk_rel;
617 	Relation	pk_rel;
618 	TupleTableSlot *oldslot;
619 	RI_QueryKey qkey;
620 	SPIPlanPtr	qplan;
621 
622 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
623 									trigdata->tg_relation, true);
624 
625 	/*
626 	 * Get the relation descriptors of the FK and PK tables and the old tuple.
627 	 *
628 	 * fk_rel is opened in RowShareLock mode since that's what our eventual
629 	 * SELECT FOR KEY SHARE will get on it.
630 	 */
631 	fk_rel = table_open(riinfo->fk_relid, RowShareLock);
632 	pk_rel = trigdata->tg_relation;
633 	oldslot = trigdata->tg_trigslot;
634 
635 	/*
636 	 * If another PK row now exists providing the old key values, we should
637 	 * not do anything.  However, this check should only be made in the NO
638 	 * ACTION case; in RESTRICT cases we don't wish to allow another row to be
639 	 * substituted.
640 	 */
641 	if (is_no_action &&
642 		ri_Check_Pk_Match(pk_rel, fk_rel, oldslot, riinfo))
643 	{
644 		table_close(fk_rel, RowShareLock);
645 		return PointerGetDatum(NULL);
646 	}
647 
648 	if (SPI_connect() != SPI_OK_CONNECT)
649 		elog(ERROR, "SPI_connect failed");
650 
651 	/*
652 	 * Fetch or prepare a saved plan for the restrict lookup (it's the same
653 	 * query for delete and update cases)
654 	 */
655 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_RESTRICT_CHECKREF);
656 
657 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
658 	{
659 		StringInfoData querybuf;
660 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
661 		char		attname[MAX_QUOTED_NAME_LEN];
662 		char		paramname[16];
663 		const char *querysep;
664 		Oid			queryoids[RI_MAX_NUMKEYS];
665 		const char *fk_only;
666 
667 		/* ----------
668 		 * The query string built is
669 		 *	SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
670 		 *		   FOR KEY SHARE OF x
671 		 * The type id's for the $ parameters are those of the
672 		 * corresponding PK attributes.
673 		 * ----------
674 		 */
675 		initStringInfo(&querybuf);
676 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
677 			"" : "ONLY ";
678 		quoteRelationName(fkrelname, fk_rel);
679 		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
680 						 fk_only, fkrelname);
681 		querysep = "WHERE";
682 		for (int i = 0; i < riinfo->nkeys; i++)
683 		{
684 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
685 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
686 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
687 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
688 
689 			quoteOneName(attname,
690 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
691 			sprintf(paramname, "$%d", i + 1);
692 			ri_GenerateQual(&querybuf, querysep,
693 							paramname, pk_type,
694 							riinfo->pf_eq_oprs[i],
695 							attname, fk_type);
696 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
697 				ri_GenerateQualCollation(&querybuf, pk_coll);
698 			querysep = "AND";
699 			queryoids[i] = pk_type;
700 		}
701 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
702 
703 		/* Prepare and save the plan */
704 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
705 							 &qkey, fk_rel, pk_rel);
706 	}
707 
708 	/*
709 	 * We have a plan now. Run it to check for existing references.
710 	 */
711 	ri_PerformCheck(riinfo, &qkey, qplan,
712 					fk_rel, pk_rel,
713 					oldslot, NULL,
714 					true,		/* must detect new rows */
715 					SPI_OK_SELECT);
716 
717 	if (SPI_finish() != SPI_OK_FINISH)
718 		elog(ERROR, "SPI_finish failed");
719 
720 	table_close(fk_rel, RowShareLock);
721 
722 	return PointerGetDatum(NULL);
723 }
724 
725 
726 /*
727  * RI_FKey_cascade_del -
728  *
729  * Cascaded delete foreign key references at delete event on PK table.
730  */
731 Datum
RI_FKey_cascade_del(PG_FUNCTION_ARGS)732 RI_FKey_cascade_del(PG_FUNCTION_ARGS)
733 {
734 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
735 	const RI_ConstraintInfo *riinfo;
736 	Relation	fk_rel;
737 	Relation	pk_rel;
738 	TupleTableSlot *oldslot;
739 	RI_QueryKey qkey;
740 	SPIPlanPtr	qplan;
741 
742 	/* Check that this is a valid trigger call on the right time and event. */
743 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_del", RI_TRIGTYPE_DELETE);
744 
745 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
746 									trigdata->tg_relation, true);
747 
748 	/*
749 	 * Get the relation descriptors of the FK and PK tables and the old tuple.
750 	 *
751 	 * fk_rel is opened in RowExclusiveLock mode since that's what our
752 	 * eventual DELETE will get on it.
753 	 */
754 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
755 	pk_rel = trigdata->tg_relation;
756 	oldslot = trigdata->tg_trigslot;
757 
758 	if (SPI_connect() != SPI_OK_CONNECT)
759 		elog(ERROR, "SPI_connect failed");
760 
761 	/* Fetch or prepare a saved plan for the cascaded delete */
762 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_DEL_DODELETE);
763 
764 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
765 	{
766 		StringInfoData querybuf;
767 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
768 		char		attname[MAX_QUOTED_NAME_LEN];
769 		char		paramname[16];
770 		const char *querysep;
771 		Oid			queryoids[RI_MAX_NUMKEYS];
772 		const char *fk_only;
773 
774 		/* ----------
775 		 * The query string built is
776 		 *	DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
777 		 * The type id's for the $ parameters are those of the
778 		 * corresponding PK attributes.
779 		 * ----------
780 		 */
781 		initStringInfo(&querybuf);
782 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
783 			"" : "ONLY ";
784 		quoteRelationName(fkrelname, fk_rel);
785 		appendStringInfo(&querybuf, "DELETE FROM %s%s",
786 						 fk_only, fkrelname);
787 		querysep = "WHERE";
788 		for (int i = 0; i < riinfo->nkeys; i++)
789 		{
790 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
791 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
792 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
793 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
794 
795 			quoteOneName(attname,
796 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
797 			sprintf(paramname, "$%d", i + 1);
798 			ri_GenerateQual(&querybuf, querysep,
799 							paramname, pk_type,
800 							riinfo->pf_eq_oprs[i],
801 							attname, fk_type);
802 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
803 				ri_GenerateQualCollation(&querybuf, pk_coll);
804 			querysep = "AND";
805 			queryoids[i] = pk_type;
806 		}
807 
808 		/* Prepare and save the plan */
809 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
810 							 &qkey, fk_rel, pk_rel);
811 	}
812 
813 	/*
814 	 * We have a plan now. Build up the arguments from the key values in the
815 	 * deleted PK tuple and delete the referencing rows
816 	 */
817 	ri_PerformCheck(riinfo, &qkey, qplan,
818 					fk_rel, pk_rel,
819 					oldslot, NULL,
820 					true,		/* must detect new rows */
821 					SPI_OK_DELETE);
822 
823 	if (SPI_finish() != SPI_OK_FINISH)
824 		elog(ERROR, "SPI_finish failed");
825 
826 	table_close(fk_rel, RowExclusiveLock);
827 
828 	return PointerGetDatum(NULL);
829 }
830 
831 
832 /*
833  * RI_FKey_cascade_upd -
834  *
835  * Cascaded update foreign key references at update event on PK table.
836  */
837 Datum
RI_FKey_cascade_upd(PG_FUNCTION_ARGS)838 RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
839 {
840 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
841 	const RI_ConstraintInfo *riinfo;
842 	Relation	fk_rel;
843 	Relation	pk_rel;
844 	TupleTableSlot *newslot;
845 	TupleTableSlot *oldslot;
846 	RI_QueryKey qkey;
847 	SPIPlanPtr	qplan;
848 
849 	/* Check that this is a valid trigger call on the right time and event. */
850 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_upd", RI_TRIGTYPE_UPDATE);
851 
852 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
853 									trigdata->tg_relation, true);
854 
855 	/*
856 	 * Get the relation descriptors of the FK and PK tables and the new and
857 	 * old tuple.
858 	 *
859 	 * fk_rel is opened in RowExclusiveLock mode since that's what our
860 	 * eventual UPDATE will get on it.
861 	 */
862 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
863 	pk_rel = trigdata->tg_relation;
864 	newslot = trigdata->tg_newslot;
865 	oldslot = trigdata->tg_trigslot;
866 
867 	if (SPI_connect() != SPI_OK_CONNECT)
868 		elog(ERROR, "SPI_connect failed");
869 
870 	/* Fetch or prepare a saved plan for the cascaded update */
871 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_UPD_DOUPDATE);
872 
873 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
874 	{
875 		StringInfoData querybuf;
876 		StringInfoData qualbuf;
877 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
878 		char		attname[MAX_QUOTED_NAME_LEN];
879 		char		paramname[16];
880 		const char *querysep;
881 		const char *qualsep;
882 		Oid			queryoids[RI_MAX_NUMKEYS * 2];
883 		const char *fk_only;
884 
885 		/* ----------
886 		 * The query string built is
887 		 *	UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
888 		 *			WHERE $n = fkatt1 [AND ...]
889 		 * The type id's for the $ parameters are those of the
890 		 * corresponding PK attributes.  Note that we are assuming
891 		 * there is an assignment cast from the PK to the FK type;
892 		 * else the parser will fail.
893 		 * ----------
894 		 */
895 		initStringInfo(&querybuf);
896 		initStringInfo(&qualbuf);
897 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
898 			"" : "ONLY ";
899 		quoteRelationName(fkrelname, fk_rel);
900 		appendStringInfo(&querybuf, "UPDATE %s%s SET",
901 						 fk_only, fkrelname);
902 		querysep = "";
903 		qualsep = "WHERE";
904 		for (int i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
905 		{
906 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
907 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
908 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
909 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
910 
911 			quoteOneName(attname,
912 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
913 			appendStringInfo(&querybuf,
914 							 "%s %s = $%d",
915 							 querysep, attname, i + 1);
916 			sprintf(paramname, "$%d", j + 1);
917 			ri_GenerateQual(&qualbuf, qualsep,
918 							paramname, pk_type,
919 							riinfo->pf_eq_oprs[i],
920 							attname, fk_type);
921 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
922 				ri_GenerateQualCollation(&querybuf, pk_coll);
923 			querysep = ",";
924 			qualsep = "AND";
925 			queryoids[i] = pk_type;
926 			queryoids[j] = pk_type;
927 		}
928 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
929 
930 		/* Prepare and save the plan */
931 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys * 2, queryoids,
932 							 &qkey, fk_rel, pk_rel);
933 	}
934 
935 	/*
936 	 * We have a plan now. Run it to update the existing references.
937 	 */
938 	ri_PerformCheck(riinfo, &qkey, qplan,
939 					fk_rel, pk_rel,
940 					oldslot, newslot,
941 					true,		/* must detect new rows */
942 					SPI_OK_UPDATE);
943 
944 	if (SPI_finish() != SPI_OK_FINISH)
945 		elog(ERROR, "SPI_finish failed");
946 
947 	table_close(fk_rel, RowExclusiveLock);
948 
949 	return PointerGetDatum(NULL);
950 }
951 
952 
953 /*
954  * RI_FKey_setnull_del -
955  *
956  * Set foreign key references to NULL values at delete event on PK table.
957  */
958 Datum
RI_FKey_setnull_del(PG_FUNCTION_ARGS)959 RI_FKey_setnull_del(PG_FUNCTION_ARGS)
960 {
961 	/* Check that this is a valid trigger call on the right time and event. */
962 	ri_CheckTrigger(fcinfo, "RI_FKey_setnull_del", RI_TRIGTYPE_DELETE);
963 
964 	/* Share code with UPDATE case */
965 	return ri_set((TriggerData *) fcinfo->context, true);
966 }
967 
968 /*
969  * RI_FKey_setnull_upd -
970  *
971  * Set foreign key references to NULL at update event on PK table.
972  */
973 Datum
RI_FKey_setnull_upd(PG_FUNCTION_ARGS)974 RI_FKey_setnull_upd(PG_FUNCTION_ARGS)
975 {
976 	/* Check that this is a valid trigger call on the right time and event. */
977 	ri_CheckTrigger(fcinfo, "RI_FKey_setnull_upd", RI_TRIGTYPE_UPDATE);
978 
979 	/* Share code with DELETE case */
980 	return ri_set((TriggerData *) fcinfo->context, true);
981 }
982 
983 /*
984  * RI_FKey_setdefault_del -
985  *
986  * Set foreign key references to defaults at delete event on PK table.
987  */
988 Datum
RI_FKey_setdefault_del(PG_FUNCTION_ARGS)989 RI_FKey_setdefault_del(PG_FUNCTION_ARGS)
990 {
991 	/* Check that this is a valid trigger call on the right time and event. */
992 	ri_CheckTrigger(fcinfo, "RI_FKey_setdefault_del", RI_TRIGTYPE_DELETE);
993 
994 	/* Share code with UPDATE case */
995 	return ri_set((TriggerData *) fcinfo->context, false);
996 }
997 
998 /*
999  * RI_FKey_setdefault_upd -
1000  *
1001  * Set foreign key references to defaults at update event on PK table.
1002  */
1003 Datum
RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)1004 RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)
1005 {
1006 	/* Check that this is a valid trigger call on the right time and event. */
1007 	ri_CheckTrigger(fcinfo, "RI_FKey_setdefault_upd", RI_TRIGTYPE_UPDATE);
1008 
1009 	/* Share code with DELETE case */
1010 	return ri_set((TriggerData *) fcinfo->context, false);
1011 }
1012 
1013 /*
1014  * ri_set -
1015  *
1016  * Common code for ON DELETE SET NULL, ON DELETE SET DEFAULT, ON UPDATE SET
1017  * NULL, and ON UPDATE SET DEFAULT.
1018  */
1019 static Datum
ri_set(TriggerData * trigdata,bool is_set_null)1020 ri_set(TriggerData *trigdata, bool is_set_null)
1021 {
1022 	const RI_ConstraintInfo *riinfo;
1023 	Relation	fk_rel;
1024 	Relation	pk_rel;
1025 	TupleTableSlot *oldslot;
1026 	RI_QueryKey qkey;
1027 	SPIPlanPtr	qplan;
1028 
1029 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
1030 									trigdata->tg_relation, true);
1031 
1032 	/*
1033 	 * Get the relation descriptors of the FK and PK tables and the old tuple.
1034 	 *
1035 	 * fk_rel is opened in RowExclusiveLock mode since that's what our
1036 	 * eventual UPDATE will get on it.
1037 	 */
1038 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
1039 	pk_rel = trigdata->tg_relation;
1040 	oldslot = trigdata->tg_trigslot;
1041 
1042 	if (SPI_connect() != SPI_OK_CONNECT)
1043 		elog(ERROR, "SPI_connect failed");
1044 
1045 	/*
1046 	 * Fetch or prepare a saved plan for the set null/default operation (it's
1047 	 * the same query for delete and update cases)
1048 	 */
1049 	ri_BuildQueryKey(&qkey, riinfo,
1050 					 (is_set_null
1051 					  ? RI_PLAN_SETNULL_DOUPDATE
1052 					  : RI_PLAN_SETDEFAULT_DOUPDATE));
1053 
1054 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
1055 	{
1056 		StringInfoData querybuf;
1057 		StringInfoData qualbuf;
1058 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
1059 		char		attname[MAX_QUOTED_NAME_LEN];
1060 		char		paramname[16];
1061 		const char *querysep;
1062 		const char *qualsep;
1063 		Oid			queryoids[RI_MAX_NUMKEYS];
1064 		const char *fk_only;
1065 
1066 		/* ----------
1067 		 * The query string built is
1068 		 *	UPDATE [ONLY] <fktable> SET fkatt1 = {NULL|DEFAULT} [, ...]
1069 		 *			WHERE $1 = fkatt1 [AND ...]
1070 		 * The type id's for the $ parameters are those of the
1071 		 * corresponding PK attributes.
1072 		 * ----------
1073 		 */
1074 		initStringInfo(&querybuf);
1075 		initStringInfo(&qualbuf);
1076 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1077 			"" : "ONLY ";
1078 		quoteRelationName(fkrelname, fk_rel);
1079 		appendStringInfo(&querybuf, "UPDATE %s%s SET",
1080 						 fk_only, fkrelname);
1081 		querysep = "";
1082 		qualsep = "WHERE";
1083 		for (int i = 0; i < riinfo->nkeys; i++)
1084 		{
1085 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1086 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1087 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1088 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1089 
1090 			quoteOneName(attname,
1091 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1092 			appendStringInfo(&querybuf,
1093 							 "%s %s = %s",
1094 							 querysep, attname,
1095 							 is_set_null ? "NULL" : "DEFAULT");
1096 			sprintf(paramname, "$%d", i + 1);
1097 			ri_GenerateQual(&qualbuf, qualsep,
1098 							paramname, pk_type,
1099 							riinfo->pf_eq_oprs[i],
1100 							attname, fk_type);
1101 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
1102 				ri_GenerateQualCollation(&querybuf, pk_coll);
1103 			querysep = ",";
1104 			qualsep = "AND";
1105 			queryoids[i] = pk_type;
1106 		}
1107 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
1108 
1109 		/* Prepare and save the plan */
1110 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
1111 							 &qkey, fk_rel, pk_rel);
1112 	}
1113 
1114 	/*
1115 	 * We have a plan now. Run it to update the existing references.
1116 	 */
1117 	ri_PerformCheck(riinfo, &qkey, qplan,
1118 					fk_rel, pk_rel,
1119 					oldslot, NULL,
1120 					true,		/* must detect new rows */
1121 					SPI_OK_UPDATE);
1122 
1123 	if (SPI_finish() != SPI_OK_FINISH)
1124 		elog(ERROR, "SPI_finish failed");
1125 
1126 	table_close(fk_rel, RowExclusiveLock);
1127 
1128 	if (is_set_null)
1129 		return PointerGetDatum(NULL);
1130 	else
1131 	{
1132 		/*
1133 		 * If we just deleted or updated the PK row whose key was equal to the
1134 		 * FK columns' default values, and a referencing row exists in the FK
1135 		 * table, we would have updated that row to the same values it already
1136 		 * had --- and RI_FKey_fk_upd_check_required would hence believe no
1137 		 * check is necessary.  So we need to do another lookup now and in
1138 		 * case a reference still exists, abort the operation.  That is
1139 		 * already implemented in the NO ACTION trigger, so just run it. (This
1140 		 * recheck is only needed in the SET DEFAULT case, since CASCADE would
1141 		 * remove such rows in case of a DELETE operation or would change the
1142 		 * FK key values in case of an UPDATE, while SET NULL is certain to
1143 		 * result in rows that satisfy the FK constraint.)
1144 		 */
1145 		return ri_restrict(trigdata, true);
1146 	}
1147 }
1148 
1149 
1150 /*
1151  * RI_FKey_pk_upd_check_required -
1152  *
1153  * Check if we really need to fire the RI trigger for an update or delete to a PK
1154  * relation.  This is called by the AFTER trigger queue manager to see if
1155  * it can skip queuing an instance of an RI trigger.  Returns true if the
1156  * trigger must be fired, false if we can prove the constraint will still
1157  * be satisfied.
1158  *
1159  * newslot will be NULL if this is called for a delete.
1160  */
1161 bool
RI_FKey_pk_upd_check_required(Trigger * trigger,Relation pk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot)1162 RI_FKey_pk_upd_check_required(Trigger *trigger, Relation pk_rel,
1163 							  TupleTableSlot *oldslot, TupleTableSlot *newslot)
1164 {
1165 	const RI_ConstraintInfo *riinfo;
1166 
1167 	riinfo = ri_FetchConstraintInfo(trigger, pk_rel, true);
1168 
1169 	/*
1170 	 * If any old key value is NULL, the row could not have been referenced by
1171 	 * an FK row, so no check is needed.
1172 	 */
1173 	if (ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) != RI_KEYS_NONE_NULL)
1174 		return false;
1175 
1176 	/* If all old and new key values are equal, no check is needed */
1177 	if (newslot && ri_KeysEqual(pk_rel, oldslot, newslot, riinfo, true))
1178 		return false;
1179 
1180 	/* Else we need to fire the trigger. */
1181 	return true;
1182 }
1183 
1184 /*
1185  * RI_FKey_fk_upd_check_required -
1186  *
1187  * Check if we really need to fire the RI trigger for an update to an FK
1188  * relation.  This is called by the AFTER trigger queue manager to see if
1189  * it can skip queuing an instance of an RI trigger.  Returns true if the
1190  * trigger must be fired, false if we can prove the constraint will still
1191  * be satisfied.
1192  */
1193 bool
RI_FKey_fk_upd_check_required(Trigger * trigger,Relation fk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot)1194 RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
1195 							  TupleTableSlot *oldslot, TupleTableSlot *newslot)
1196 {
1197 	const RI_ConstraintInfo *riinfo;
1198 	int			ri_nullcheck;
1199 	Datum		xminDatum;
1200 	TransactionId xmin;
1201 	bool		isnull;
1202 
1203 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1204 
1205 	ri_nullcheck = ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false);
1206 
1207 	/*
1208 	 * If all new key values are NULL, the row satisfies the constraint, so no
1209 	 * check is needed.
1210 	 */
1211 	if (ri_nullcheck == RI_KEYS_ALL_NULL)
1212 		return false;
1213 
1214 	/*
1215 	 * If some new key values are NULL, the behavior depends on the match
1216 	 * type.
1217 	 */
1218 	else if (ri_nullcheck == RI_KEYS_SOME_NULL)
1219 	{
1220 		switch (riinfo->confmatchtype)
1221 		{
1222 			case FKCONSTR_MATCH_SIMPLE:
1223 
1224 				/*
1225 				 * If any new key value is NULL, the row must satisfy the
1226 				 * constraint, so no check is needed.
1227 				 */
1228 				return false;
1229 
1230 			case FKCONSTR_MATCH_PARTIAL:
1231 
1232 				/*
1233 				 * Don't know, must run full check.
1234 				 */
1235 				break;
1236 
1237 			case FKCONSTR_MATCH_FULL:
1238 
1239 				/*
1240 				 * If some new key values are NULL, the row fails the
1241 				 * constraint.  We must not throw error here, because the row
1242 				 * might get invalidated before the constraint is to be
1243 				 * checked, but we should queue the event to apply the check
1244 				 * later.
1245 				 */
1246 				return true;
1247 		}
1248 	}
1249 
1250 	/*
1251 	 * Continues here for no new key values are NULL, or we couldn't decide
1252 	 * yet.
1253 	 */
1254 
1255 	/*
1256 	 * If the original row was inserted by our own transaction, we must fire
1257 	 * the trigger whether or not the keys are equal.  This is because our
1258 	 * UPDATE will invalidate the INSERT so that the INSERT RI trigger will
1259 	 * not do anything; so we had better do the UPDATE check.  (We could skip
1260 	 * this if we knew the INSERT trigger already fired, but there is no easy
1261 	 * way to know that.)
1262 	 */
1263 	xminDatum = slot_getsysattr(oldslot, MinTransactionIdAttributeNumber, &isnull);
1264 	Assert(!isnull);
1265 	xmin = DatumGetTransactionId(xminDatum);
1266 	if (TransactionIdIsCurrentTransactionId(xmin))
1267 		return true;
1268 
1269 	/* If all old and new key values are equal, no check is needed */
1270 	if (ri_KeysEqual(fk_rel, oldslot, newslot, riinfo, false))
1271 		return false;
1272 
1273 	/* Else we need to fire the trigger. */
1274 	return true;
1275 }
1276 
1277 /*
1278  * RI_Initial_Check -
1279  *
1280  * Check an entire table for non-matching values using a single query.
1281  * This is not a trigger procedure, but is called during ALTER TABLE
1282  * ADD FOREIGN KEY to validate the initial table contents.
1283  *
1284  * We expect that the caller has made provision to prevent any problems
1285  * caused by concurrent actions. This could be either by locking rel and
1286  * pkrel at ShareRowExclusiveLock or higher, or by otherwise ensuring
1287  * that triggers implementing the checks are already active.
1288  * Hence, we do not need to lock individual rows for the check.
1289  *
1290  * If the check fails because the current user doesn't have permissions
1291  * to read both tables, return false to let our caller know that they will
1292  * need to do something else to check the constraint.
1293  */
1294 bool
RI_Initial_Check(Trigger * trigger,Relation fk_rel,Relation pk_rel)1295 RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
1296 {
1297 	const RI_ConstraintInfo *riinfo;
1298 	StringInfoData querybuf;
1299 	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
1300 	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
1301 	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
1302 	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
1303 	RangeTblEntry *pkrte;
1304 	RangeTblEntry *fkrte;
1305 	const char *sep;
1306 	const char *fk_only;
1307 	const char *pk_only;
1308 	int			save_nestlevel;
1309 	char		workmembuf[32];
1310 	int			spi_result;
1311 	SPIPlanPtr	qplan;
1312 
1313 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1314 
1315 	/*
1316 	 * Check to make sure current user has enough permissions to do the test
1317 	 * query.  (If not, caller can fall back to the trigger method, which
1318 	 * works because it changes user IDs on the fly.)
1319 	 *
1320 	 * XXX are there any other show-stopper conditions to check?
1321 	 */
1322 	pkrte = makeNode(RangeTblEntry);
1323 	pkrte->rtekind = RTE_RELATION;
1324 	pkrte->relid = RelationGetRelid(pk_rel);
1325 	pkrte->relkind = pk_rel->rd_rel->relkind;
1326 	pkrte->rellockmode = AccessShareLock;
1327 	pkrte->requiredPerms = ACL_SELECT;
1328 
1329 	fkrte = makeNode(RangeTblEntry);
1330 	fkrte->rtekind = RTE_RELATION;
1331 	fkrte->relid = RelationGetRelid(fk_rel);
1332 	fkrte->relkind = fk_rel->rd_rel->relkind;
1333 	fkrte->rellockmode = AccessShareLock;
1334 	fkrte->requiredPerms = ACL_SELECT;
1335 
1336 	for (int i = 0; i < riinfo->nkeys; i++)
1337 	{
1338 		int			attno;
1339 
1340 		attno = riinfo->pk_attnums[i] - FirstLowInvalidHeapAttributeNumber;
1341 		pkrte->selectedCols = bms_add_member(pkrte->selectedCols, attno);
1342 
1343 		attno = riinfo->fk_attnums[i] - FirstLowInvalidHeapAttributeNumber;
1344 		fkrte->selectedCols = bms_add_member(fkrte->selectedCols, attno);
1345 	}
1346 
1347 	if (!ExecCheckRTPerms(list_make2(fkrte, pkrte), false))
1348 		return false;
1349 
1350 	/*
1351 	 * Also punt if RLS is enabled on either table unless this role has the
1352 	 * bypassrls right or is the table owner of the table(s) involved which
1353 	 * have RLS enabled.
1354 	 */
1355 	if (!has_bypassrls_privilege(GetUserId()) &&
1356 		((pk_rel->rd_rel->relrowsecurity &&
1357 		  !pg_class_ownercheck(pkrte->relid, GetUserId())) ||
1358 		 (fk_rel->rd_rel->relrowsecurity &&
1359 		  !pg_class_ownercheck(fkrte->relid, GetUserId()))))
1360 		return false;
1361 
1362 	/*----------
1363 	 * The query string built is:
1364 	 *	SELECT fk.keycols FROM [ONLY] relname fk
1365 	 *	 LEFT OUTER JOIN [ONLY] pkrelname pk
1366 	 *	 ON (pk.pkkeycol1=fk.keycol1 [AND ...])
1367 	 *	 WHERE pk.pkkeycol1 IS NULL AND
1368 	 * For MATCH SIMPLE:
1369 	 *	 (fk.keycol1 IS NOT NULL [AND ...])
1370 	 * For MATCH FULL:
1371 	 *	 (fk.keycol1 IS NOT NULL [OR ...])
1372 	 *
1373 	 * We attach COLLATE clauses to the operators when comparing columns
1374 	 * that have different collations.
1375 	 *----------
1376 	 */
1377 	initStringInfo(&querybuf);
1378 	appendStringInfoString(&querybuf, "SELECT ");
1379 	sep = "";
1380 	for (int i = 0; i < riinfo->nkeys; i++)
1381 	{
1382 		quoteOneName(fkattname,
1383 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1384 		appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
1385 		sep = ", ";
1386 	}
1387 
1388 	quoteRelationName(pkrelname, pk_rel);
1389 	quoteRelationName(fkrelname, fk_rel);
1390 	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1391 		"" : "ONLY ";
1392 	pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1393 		"" : "ONLY ";
1394 	appendStringInfo(&querybuf,
1395 					 " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
1396 					 fk_only, fkrelname, pk_only, pkrelname);
1397 
1398 	strcpy(pkattname, "pk.");
1399 	strcpy(fkattname, "fk.");
1400 	sep = "(";
1401 	for (int i = 0; i < riinfo->nkeys; i++)
1402 	{
1403 		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1404 		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1405 		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1406 		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1407 
1408 		quoteOneName(pkattname + 3,
1409 					 RIAttName(pk_rel, riinfo->pk_attnums[i]));
1410 		quoteOneName(fkattname + 3,
1411 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1412 		ri_GenerateQual(&querybuf, sep,
1413 						pkattname, pk_type,
1414 						riinfo->pf_eq_oprs[i],
1415 						fkattname, fk_type);
1416 		if (pk_coll != fk_coll)
1417 			ri_GenerateQualCollation(&querybuf, pk_coll);
1418 		sep = "AND";
1419 	}
1420 
1421 	/*
1422 	 * It's sufficient to test any one pk attribute for null to detect a join
1423 	 * failure.
1424 	 */
1425 	quoteOneName(pkattname, RIAttName(pk_rel, riinfo->pk_attnums[0]));
1426 	appendStringInfo(&querybuf, ") WHERE pk.%s IS NULL AND (", pkattname);
1427 
1428 	sep = "";
1429 	for (int i = 0; i < riinfo->nkeys; i++)
1430 	{
1431 		quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
1432 		appendStringInfo(&querybuf,
1433 						 "%sfk.%s IS NOT NULL",
1434 						 sep, fkattname);
1435 		switch (riinfo->confmatchtype)
1436 		{
1437 			case FKCONSTR_MATCH_SIMPLE:
1438 				sep = " AND ";
1439 				break;
1440 			case FKCONSTR_MATCH_FULL:
1441 				sep = " OR ";
1442 				break;
1443 		}
1444 	}
1445 	appendStringInfoChar(&querybuf, ')');
1446 
1447 	/*
1448 	 * Temporarily increase work_mem so that the check query can be executed
1449 	 * more efficiently.  It seems okay to do this because the query is simple
1450 	 * enough to not use a multiple of work_mem, and one typically would not
1451 	 * have many large foreign-key validations happening concurrently.  So
1452 	 * this seems to meet the criteria for being considered a "maintenance"
1453 	 * operation, and accordingly we use maintenance_work_mem.  However, we
1454 	 * must also set hash_mem_multiplier to 1, since it is surely not okay to
1455 	 * let that get applied to the maintenance_work_mem value.
1456 	 *
1457 	 * We use the equivalent of a function SET option to allow the setting to
1458 	 * persist for exactly the duration of the check query.  guc.c also takes
1459 	 * care of undoing the setting on error.
1460 	 */
1461 	save_nestlevel = NewGUCNestLevel();
1462 
1463 	snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
1464 	(void) set_config_option("work_mem", workmembuf,
1465 							 PGC_USERSET, PGC_S_SESSION,
1466 							 GUC_ACTION_SAVE, true, 0, false);
1467 	(void) set_config_option("hash_mem_multiplier", "1",
1468 							 PGC_USERSET, PGC_S_SESSION,
1469 							 GUC_ACTION_SAVE, true, 0, false);
1470 
1471 	if (SPI_connect() != SPI_OK_CONNECT)
1472 		elog(ERROR, "SPI_connect failed");
1473 
1474 	/*
1475 	 * Generate the plan.  We don't need to cache it, and there are no
1476 	 * arguments to the plan.
1477 	 */
1478 	qplan = SPI_prepare(querybuf.data, 0, NULL);
1479 
1480 	if (qplan == NULL)
1481 		elog(ERROR, "SPI_prepare returned %s for %s",
1482 			 SPI_result_code_string(SPI_result), querybuf.data);
1483 
1484 	/*
1485 	 * Run the plan.  For safety we force a current snapshot to be used. (In
1486 	 * transaction-snapshot mode, this arguably violates transaction isolation
1487 	 * rules, but we really haven't got much choice.) We don't need to
1488 	 * register the snapshot, because SPI_execute_snapshot will see to it. We
1489 	 * need at most one tuple returned, so pass limit = 1.
1490 	 */
1491 	spi_result = SPI_execute_snapshot(qplan,
1492 									  NULL, NULL,
1493 									  GetLatestSnapshot(),
1494 									  InvalidSnapshot,
1495 									  true, false, 1);
1496 
1497 	/* Check result */
1498 	if (spi_result != SPI_OK_SELECT)
1499 		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
1500 
1501 	/* Did we find a tuple violating the constraint? */
1502 	if (SPI_processed > 0)
1503 	{
1504 		TupleTableSlot *slot;
1505 		HeapTuple	tuple = SPI_tuptable->vals[0];
1506 		TupleDesc	tupdesc = SPI_tuptable->tupdesc;
1507 		RI_ConstraintInfo fake_riinfo;
1508 
1509 		slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
1510 
1511 		heap_deform_tuple(tuple, tupdesc,
1512 						  slot->tts_values, slot->tts_isnull);
1513 		ExecStoreVirtualTuple(slot);
1514 
1515 		/*
1516 		 * The columns to look at in the result tuple are 1..N, not whatever
1517 		 * they are in the fk_rel.  Hack up riinfo so that the subroutines
1518 		 * called here will behave properly.
1519 		 *
1520 		 * In addition to this, we have to pass the correct tupdesc to
1521 		 * ri_ReportViolation, overriding its normal habit of using the pk_rel
1522 		 * or fk_rel's tupdesc.
1523 		 */
1524 		memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
1525 		for (int i = 0; i < fake_riinfo.nkeys; i++)
1526 			fake_riinfo.fk_attnums[i] = i + 1;
1527 
1528 		/*
1529 		 * If it's MATCH FULL, and there are any nulls in the FK keys,
1530 		 * complain about that rather than the lack of a match.  MATCH FULL
1531 		 * disallows partially-null FK rows.
1532 		 */
1533 		if (fake_riinfo.confmatchtype == FKCONSTR_MATCH_FULL &&
1534 			ri_NullCheck(tupdesc, slot, &fake_riinfo, false) != RI_KEYS_NONE_NULL)
1535 			ereport(ERROR,
1536 					(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
1537 					 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
1538 							RelationGetRelationName(fk_rel),
1539 							NameStr(fake_riinfo.conname)),
1540 					 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
1541 					 errtableconstraint(fk_rel,
1542 										NameStr(fake_riinfo.conname))));
1543 
1544 		/*
1545 		 * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
1546 		 * query, which isn't true, but will cause it to use
1547 		 * fake_riinfo.fk_attnums as we need.
1548 		 */
1549 		ri_ReportViolation(&fake_riinfo,
1550 						   pk_rel, fk_rel,
1551 						   slot, tupdesc,
1552 						   RI_PLAN_CHECK_LOOKUPPK, false);
1553 
1554 		ExecDropSingleTupleTableSlot(slot);
1555 	}
1556 
1557 	if (SPI_finish() != SPI_OK_FINISH)
1558 		elog(ERROR, "SPI_finish failed");
1559 
1560 	/*
1561 	 * Restore work_mem and hash_mem_multiplier.
1562 	 */
1563 	AtEOXact_GUC(true, save_nestlevel);
1564 
1565 	return true;
1566 }
1567 
1568 /*
1569  * RI_PartitionRemove_Check -
1570  *
1571  * Verify no referencing values exist, when a partition is detached on
1572  * the referenced side of a foreign key constraint.
1573  */
1574 void
RI_PartitionRemove_Check(Trigger * trigger,Relation fk_rel,Relation pk_rel)1575 RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
1576 {
1577 	const RI_ConstraintInfo *riinfo;
1578 	StringInfoData querybuf;
1579 	char	   *constraintDef;
1580 	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
1581 	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
1582 	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
1583 	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
1584 	const char *sep;
1585 	const char *fk_only;
1586 	int			save_nestlevel;
1587 	char		workmembuf[32];
1588 	int			spi_result;
1589 	SPIPlanPtr	qplan;
1590 	int			i;
1591 
1592 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1593 
1594 	/*
1595 	 * We don't check permissions before displaying the error message, on the
1596 	 * assumption that the user detaching the partition must have enough
1597 	 * privileges to examine the table contents anyhow.
1598 	 */
1599 
1600 	/*----------
1601 	 * The query string built is:
1602 	 *  SELECT fk.keycols FROM [ONLY] relname fk
1603 	 *    JOIN pkrelname pk
1604 	 *    ON (pk.pkkeycol1=fk.keycol1 [AND ...])
1605 	 *    WHERE (<partition constraint>) AND
1606 	 * For MATCH SIMPLE:
1607 	 *   (fk.keycol1 IS NOT NULL [AND ...])
1608 	 * For MATCH FULL:
1609 	 *   (fk.keycol1 IS NOT NULL [OR ...])
1610 	 *
1611 	 * We attach COLLATE clauses to the operators when comparing columns
1612 	 * that have different collations.
1613 	 *----------
1614 	 */
1615 	initStringInfo(&querybuf);
1616 	appendStringInfoString(&querybuf, "SELECT ");
1617 	sep = "";
1618 	for (i = 0; i < riinfo->nkeys; i++)
1619 	{
1620 		quoteOneName(fkattname,
1621 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1622 		appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
1623 		sep = ", ";
1624 	}
1625 
1626 	quoteRelationName(pkrelname, pk_rel);
1627 	quoteRelationName(fkrelname, fk_rel);
1628 	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1629 		"" : "ONLY ";
1630 	appendStringInfo(&querybuf,
1631 					 " FROM %s%s fk JOIN %s pk ON",
1632 					 fk_only, fkrelname, pkrelname);
1633 	strcpy(pkattname, "pk.");
1634 	strcpy(fkattname, "fk.");
1635 	sep = "(";
1636 	for (i = 0; i < riinfo->nkeys; i++)
1637 	{
1638 		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1639 		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1640 		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1641 		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1642 
1643 		quoteOneName(pkattname + 3,
1644 					 RIAttName(pk_rel, riinfo->pk_attnums[i]));
1645 		quoteOneName(fkattname + 3,
1646 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1647 		ri_GenerateQual(&querybuf, sep,
1648 						pkattname, pk_type,
1649 						riinfo->pf_eq_oprs[i],
1650 						fkattname, fk_type);
1651 		if (pk_coll != fk_coll)
1652 			ri_GenerateQualCollation(&querybuf, pk_coll);
1653 		sep = "AND";
1654 	}
1655 
1656 	/*
1657 	 * Start the WHERE clause with the partition constraint (except if this is
1658 	 * the default partition and there's no other partition, because the
1659 	 * partition constraint is the empty string in that case.)
1660 	 */
1661 	constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
1662 	if (constraintDef && constraintDef[0] != '\0')
1663 		appendStringInfo(&querybuf, ") WHERE %s AND (",
1664 						 constraintDef);
1665 	else
1666 		appendStringInfo(&querybuf, ") WHERE (");
1667 
1668 	sep = "";
1669 	for (i = 0; i < riinfo->nkeys; i++)
1670 	{
1671 		quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
1672 		appendStringInfo(&querybuf,
1673 						 "%sfk.%s IS NOT NULL",
1674 						 sep, fkattname);
1675 		switch (riinfo->confmatchtype)
1676 		{
1677 			case FKCONSTR_MATCH_SIMPLE:
1678 				sep = " AND ";
1679 				break;
1680 			case FKCONSTR_MATCH_FULL:
1681 				sep = " OR ";
1682 				break;
1683 		}
1684 	}
1685 	appendStringInfoChar(&querybuf, ')');
1686 
1687 	/*
1688 	 * Temporarily increase work_mem so that the check query can be executed
1689 	 * more efficiently.  It seems okay to do this because the query is simple
1690 	 * enough to not use a multiple of work_mem, and one typically would not
1691 	 * have many large foreign-key validations happening concurrently.  So
1692 	 * this seems to meet the criteria for being considered a "maintenance"
1693 	 * operation, and accordingly we use maintenance_work_mem.  However, we
1694 	 * must also set hash_mem_multiplier to 1, since it is surely not okay to
1695 	 * let that get applied to the maintenance_work_mem value.
1696 	 *
1697 	 * We use the equivalent of a function SET option to allow the setting to
1698 	 * persist for exactly the duration of the check query.  guc.c also takes
1699 	 * care of undoing the setting on error.
1700 	 */
1701 	save_nestlevel = NewGUCNestLevel();
1702 
1703 	snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
1704 	(void) set_config_option("work_mem", workmembuf,
1705 							 PGC_USERSET, PGC_S_SESSION,
1706 							 GUC_ACTION_SAVE, true, 0, false);
1707 	(void) set_config_option("hash_mem_multiplier", "1",
1708 							 PGC_USERSET, PGC_S_SESSION,
1709 							 GUC_ACTION_SAVE, true, 0, false);
1710 
1711 	if (SPI_connect() != SPI_OK_CONNECT)
1712 		elog(ERROR, "SPI_connect failed");
1713 
1714 	/*
1715 	 * Generate the plan.  We don't need to cache it, and there are no
1716 	 * arguments to the plan.
1717 	 */
1718 	qplan = SPI_prepare(querybuf.data, 0, NULL);
1719 
1720 	if (qplan == NULL)
1721 		elog(ERROR, "SPI_prepare returned %s for %s",
1722 			 SPI_result_code_string(SPI_result), querybuf.data);
1723 
1724 	/*
1725 	 * Run the plan.  For safety we force a current snapshot to be used. (In
1726 	 * transaction-snapshot mode, this arguably violates transaction isolation
1727 	 * rules, but we really haven't got much choice.) We don't need to
1728 	 * register the snapshot, because SPI_execute_snapshot will see to it. We
1729 	 * need at most one tuple returned, so pass limit = 1.
1730 	 */
1731 	spi_result = SPI_execute_snapshot(qplan,
1732 									  NULL, NULL,
1733 									  GetLatestSnapshot(),
1734 									  InvalidSnapshot,
1735 									  true, false, 1);
1736 
1737 	/* Check result */
1738 	if (spi_result != SPI_OK_SELECT)
1739 		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
1740 
1741 	/* Did we find a tuple that would violate the constraint? */
1742 	if (SPI_processed > 0)
1743 	{
1744 		TupleTableSlot *slot;
1745 		HeapTuple	tuple = SPI_tuptable->vals[0];
1746 		TupleDesc	tupdesc = SPI_tuptable->tupdesc;
1747 		RI_ConstraintInfo fake_riinfo;
1748 
1749 		slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
1750 
1751 		heap_deform_tuple(tuple, tupdesc,
1752 						  slot->tts_values, slot->tts_isnull);
1753 		ExecStoreVirtualTuple(slot);
1754 
1755 		/*
1756 		 * The columns to look at in the result tuple are 1..N, not whatever
1757 		 * they are in the fk_rel.  Hack up riinfo so that ri_ReportViolation
1758 		 * will behave properly.
1759 		 *
1760 		 * In addition to this, we have to pass the correct tupdesc to
1761 		 * ri_ReportViolation, overriding its normal habit of using the pk_rel
1762 		 * or fk_rel's tupdesc.
1763 		 */
1764 		memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
1765 		for (i = 0; i < fake_riinfo.nkeys; i++)
1766 			fake_riinfo.pk_attnums[i] = i + 1;
1767 
1768 		ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
1769 						   slot, tupdesc, 0, true);
1770 	}
1771 
1772 	if (SPI_finish() != SPI_OK_FINISH)
1773 		elog(ERROR, "SPI_finish failed");
1774 
1775 	/*
1776 	 * Restore work_mem and hash_mem_multiplier.
1777 	 */
1778 	AtEOXact_GUC(true, save_nestlevel);
1779 }
1780 
1781 
1782 /* ----------
1783  * Local functions below
1784  * ----------
1785  */
1786 
1787 
1788 /*
1789  * quoteOneName --- safely quote a single SQL name
1790  *
1791  * buffer must be MAX_QUOTED_NAME_LEN long (includes room for \0)
1792  */
1793 static void
quoteOneName(char * buffer,const char * name)1794 quoteOneName(char *buffer, const char *name)
1795 {
1796 	/* Rather than trying to be smart, just always quote it. */
1797 	*buffer++ = '"';
1798 	while (*name)
1799 	{
1800 		if (*name == '"')
1801 			*buffer++ = '"';
1802 		*buffer++ = *name++;
1803 	}
1804 	*buffer++ = '"';
1805 	*buffer = '\0';
1806 }
1807 
1808 /*
1809  * quoteRelationName --- safely quote a fully qualified relation name
1810  *
1811  * buffer must be MAX_QUOTED_REL_NAME_LEN long (includes room for \0)
1812  */
1813 static void
quoteRelationName(char * buffer,Relation rel)1814 quoteRelationName(char *buffer, Relation rel)
1815 {
1816 	quoteOneName(buffer, get_namespace_name(RelationGetNamespace(rel)));
1817 	buffer += strlen(buffer);
1818 	*buffer++ = '.';
1819 	quoteOneName(buffer, RelationGetRelationName(rel));
1820 }
1821 
1822 /*
1823  * ri_GenerateQual --- generate a WHERE clause equating two variables
1824  *
1825  * This basically appends " sep leftop op rightop" to buf, adding casts
1826  * and schema qualification as needed to ensure that the parser will select
1827  * the operator we specify.  leftop and rightop should be parenthesized
1828  * if they aren't variables or parameters.
1829  */
1830 static void
ri_GenerateQual(StringInfo buf,const char * sep,const char * leftop,Oid leftoptype,Oid opoid,const char * rightop,Oid rightoptype)1831 ri_GenerateQual(StringInfo buf,
1832 				const char *sep,
1833 				const char *leftop, Oid leftoptype,
1834 				Oid opoid,
1835 				const char *rightop, Oid rightoptype)
1836 {
1837 	appendStringInfo(buf, " %s ", sep);
1838 	generate_operator_clause(buf, leftop, leftoptype, opoid,
1839 							 rightop, rightoptype);
1840 }
1841 
1842 /*
1843  * ri_GenerateQualCollation --- add a COLLATE spec to a WHERE clause
1844  *
1845  * At present, we intentionally do not use this function for RI queries that
1846  * compare a variable to a $n parameter.  Since parameter symbols always have
1847  * default collation, the effect will be to use the variable's collation.
1848  * Now that is only strictly correct when testing the referenced column, since
1849  * the SQL standard specifies that RI comparisons should use the referenced
1850  * column's collation.  However, so long as all collations have the same
1851  * notion of equality (which they do, because texteq reduces to bitwise
1852  * equality), there's no visible semantic impact from using the referencing
1853  * column's collation when testing it, and this is a good thing to do because
1854  * it lets us use a normal index on the referencing column.  However, we do
1855  * have to use this function when directly comparing the referencing and
1856  * referenced columns, if they are of different collations; else the parser
1857  * will fail to resolve the collation to use.
1858  */
1859 static void
ri_GenerateQualCollation(StringInfo buf,Oid collation)1860 ri_GenerateQualCollation(StringInfo buf, Oid collation)
1861 {
1862 	HeapTuple	tp;
1863 	Form_pg_collation colltup;
1864 	char	   *collname;
1865 	char		onename[MAX_QUOTED_NAME_LEN];
1866 
1867 	/* Nothing to do if it's a noncollatable data type */
1868 	if (!OidIsValid(collation))
1869 		return;
1870 
1871 	tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(collation));
1872 	if (!HeapTupleIsValid(tp))
1873 		elog(ERROR, "cache lookup failed for collation %u", collation);
1874 	colltup = (Form_pg_collation) GETSTRUCT(tp);
1875 	collname = NameStr(colltup->collname);
1876 
1877 	/*
1878 	 * We qualify the name always, for simplicity and to ensure the query is
1879 	 * not search-path-dependent.
1880 	 */
1881 	quoteOneName(onename, get_namespace_name(colltup->collnamespace));
1882 	appendStringInfo(buf, " COLLATE %s", onename);
1883 	quoteOneName(onename, collname);
1884 	appendStringInfo(buf, ".%s", onename);
1885 
1886 	ReleaseSysCache(tp);
1887 }
1888 
1889 /* ----------
1890  * ri_BuildQueryKey -
1891  *
1892  *	Construct a hashtable key for a prepared SPI plan of an FK constraint.
1893  *
1894  *		key: output argument, *key is filled in based on the other arguments
1895  *		riinfo: info from pg_constraint entry
1896  *		constr_queryno: an internal number identifying the query type
1897  *			(see RI_PLAN_XXX constants at head of file)
1898  * ----------
1899  */
1900 static void
ri_BuildQueryKey(RI_QueryKey * key,const RI_ConstraintInfo * riinfo,int32 constr_queryno)1901 ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
1902 				 int32 constr_queryno)
1903 {
1904 	/*
1905 	 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
1906 	 * to use memset to clear them.
1907 	 */
1908 	key->constr_id = riinfo->constraint_id;
1909 	key->constr_queryno = constr_queryno;
1910 }
1911 
1912 /*
1913  * Check that RI trigger function was called in expected context
1914  */
1915 static void
ri_CheckTrigger(FunctionCallInfo fcinfo,const char * funcname,int tgkind)1916 ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname, int tgkind)
1917 {
1918 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
1919 
1920 	if (!CALLED_AS_TRIGGER(fcinfo))
1921 		ereport(ERROR,
1922 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1923 				 errmsg("function \"%s\" was not called by trigger manager", funcname)));
1924 
1925 	/*
1926 	 * Check proper event
1927 	 */
1928 	if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) ||
1929 		!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
1930 		ereport(ERROR,
1931 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1932 				 errmsg("function \"%s\" must be fired AFTER ROW", funcname)));
1933 
1934 	switch (tgkind)
1935 	{
1936 		case RI_TRIGTYPE_INSERT:
1937 			if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
1938 				ereport(ERROR,
1939 						(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1940 						 errmsg("function \"%s\" must be fired for INSERT", funcname)));
1941 			break;
1942 		case RI_TRIGTYPE_UPDATE:
1943 			if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
1944 				ereport(ERROR,
1945 						(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1946 						 errmsg("function \"%s\" must be fired for UPDATE", funcname)));
1947 			break;
1948 		case RI_TRIGTYPE_DELETE:
1949 			if (!TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
1950 				ereport(ERROR,
1951 						(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1952 						 errmsg("function \"%s\" must be fired for DELETE", funcname)));
1953 			break;
1954 	}
1955 }
1956 
1957 
1958 /*
1959  * Fetch the RI_ConstraintInfo struct for the trigger's FK constraint.
1960  */
1961 static const RI_ConstraintInfo *
ri_FetchConstraintInfo(Trigger * trigger,Relation trig_rel,bool rel_is_pk)1962 ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
1963 {
1964 	Oid			constraintOid = trigger->tgconstraint;
1965 	const RI_ConstraintInfo *riinfo;
1966 
1967 	/*
1968 	 * Check that the FK constraint's OID is available; it might not be if
1969 	 * we've been invoked via an ordinary trigger or an old-style "constraint
1970 	 * trigger".
1971 	 */
1972 	if (!OidIsValid(constraintOid))
1973 		ereport(ERROR,
1974 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1975 				 errmsg("no pg_constraint entry for trigger \"%s\" on table \"%s\"",
1976 						trigger->tgname, RelationGetRelationName(trig_rel)),
1977 				 errhint("Remove this referential integrity trigger and its mates, then do ALTER TABLE ADD CONSTRAINT.")));
1978 
1979 	/* Find or create a hashtable entry for the constraint */
1980 	riinfo = ri_LoadConstraintInfo(constraintOid);
1981 
1982 	/* Do some easy cross-checks against the trigger call data */
1983 	if (rel_is_pk)
1984 	{
1985 		if (riinfo->fk_relid != trigger->tgconstrrelid ||
1986 			riinfo->pk_relid != RelationGetRelid(trig_rel))
1987 			elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
1988 				 trigger->tgname, RelationGetRelationName(trig_rel));
1989 	}
1990 	else
1991 	{
1992 		if (riinfo->fk_relid != RelationGetRelid(trig_rel) ||
1993 			riinfo->pk_relid != trigger->tgconstrrelid)
1994 			elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
1995 				 trigger->tgname, RelationGetRelationName(trig_rel));
1996 	}
1997 
1998 	if (riinfo->confmatchtype != FKCONSTR_MATCH_FULL &&
1999 		riinfo->confmatchtype != FKCONSTR_MATCH_PARTIAL &&
2000 		riinfo->confmatchtype != FKCONSTR_MATCH_SIMPLE)
2001 		elog(ERROR, "unrecognized confmatchtype: %d",
2002 			 riinfo->confmatchtype);
2003 
2004 	if (riinfo->confmatchtype == FKCONSTR_MATCH_PARTIAL)
2005 		ereport(ERROR,
2006 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2007 				 errmsg("MATCH PARTIAL not yet implemented")));
2008 
2009 	return riinfo;
2010 }
2011 
2012 /*
2013  * Fetch or create the RI_ConstraintInfo struct for an FK constraint.
2014  */
2015 static const RI_ConstraintInfo *
ri_LoadConstraintInfo(Oid constraintOid)2016 ri_LoadConstraintInfo(Oid constraintOid)
2017 {
2018 	RI_ConstraintInfo *riinfo;
2019 	bool		found;
2020 	HeapTuple	tup;
2021 	Form_pg_constraint conForm;
2022 
2023 	/*
2024 	 * On the first call initialize the hashtable
2025 	 */
2026 	if (!ri_constraint_cache)
2027 		ri_InitHashTables();
2028 
2029 	/*
2030 	 * Find or create a hash entry.  If we find a valid one, just return it.
2031 	 */
2032 	riinfo = (RI_ConstraintInfo *) hash_search(ri_constraint_cache,
2033 											   (void *) &constraintOid,
2034 											   HASH_ENTER, &found);
2035 	if (!found)
2036 		riinfo->valid = false;
2037 	else if (riinfo->valid)
2038 		return riinfo;
2039 
2040 	/*
2041 	 * Fetch the pg_constraint row so we can fill in the entry.
2042 	 */
2043 	tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintOid));
2044 	if (!HeapTupleIsValid(tup)) /* should not happen */
2045 		elog(ERROR, "cache lookup failed for constraint %u", constraintOid);
2046 	conForm = (Form_pg_constraint) GETSTRUCT(tup);
2047 
2048 	if (conForm->contype != CONSTRAINT_FOREIGN) /* should not happen */
2049 		elog(ERROR, "constraint %u is not a foreign key constraint",
2050 			 constraintOid);
2051 
2052 	/* And extract data */
2053 	Assert(riinfo->constraint_id == constraintOid);
2054 	riinfo->oidHashValue = GetSysCacheHashValue1(CONSTROID,
2055 												 ObjectIdGetDatum(constraintOid));
2056 	memcpy(&riinfo->conname, &conForm->conname, sizeof(NameData));
2057 	riinfo->pk_relid = conForm->confrelid;
2058 	riinfo->fk_relid = conForm->conrelid;
2059 	riinfo->confupdtype = conForm->confupdtype;
2060 	riinfo->confdeltype = conForm->confdeltype;
2061 	riinfo->confmatchtype = conForm->confmatchtype;
2062 
2063 	DeconstructFkConstraintRow(tup,
2064 							   &riinfo->nkeys,
2065 							   riinfo->fk_attnums,
2066 							   riinfo->pk_attnums,
2067 							   riinfo->pf_eq_oprs,
2068 							   riinfo->pp_eq_oprs,
2069 							   riinfo->ff_eq_oprs);
2070 
2071 	ReleaseSysCache(tup);
2072 
2073 	/*
2074 	 * For efficient processing of invalidation messages below, we keep a
2075 	 * doubly-linked list, and a count, of all currently valid entries.
2076 	 */
2077 	dlist_push_tail(&ri_constraint_cache_valid_list, &riinfo->valid_link);
2078 	ri_constraint_cache_valid_count++;
2079 
2080 	riinfo->valid = true;
2081 
2082 	return riinfo;
2083 }
2084 
2085 /*
2086  * Callback for pg_constraint inval events
2087  *
2088  * While most syscache callbacks just flush all their entries, pg_constraint
2089  * gets enough update traffic that it's probably worth being smarter.
2090  * Invalidate any ri_constraint_cache entry associated with the syscache
2091  * entry with the specified hash value, or all entries if hashvalue == 0.
2092  *
2093  * Note: at the time a cache invalidation message is processed there may be
2094  * active references to the cache.  Because of this we never remove entries
2095  * from the cache, but only mark them invalid, which is harmless to active
2096  * uses.  (Any query using an entry should hold a lock sufficient to keep that
2097  * data from changing under it --- but we may get cache flushes anyway.)
2098  */
2099 static void
InvalidateConstraintCacheCallBack(Datum arg,int cacheid,uint32 hashvalue)2100 InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
2101 {
2102 	dlist_mutable_iter iter;
2103 
2104 	Assert(ri_constraint_cache != NULL);
2105 
2106 	/*
2107 	 * If the list of currently valid entries gets excessively large, we mark
2108 	 * them all invalid so we can empty the list.  This arrangement avoids
2109 	 * O(N^2) behavior in situations where a session touches many foreign keys
2110 	 * and also does many ALTER TABLEs, such as a restore from pg_dump.
2111 	 */
2112 	if (ri_constraint_cache_valid_count > 1000)
2113 		hashvalue = 0;			/* pretend it's a cache reset */
2114 
2115 	dlist_foreach_modify(iter, &ri_constraint_cache_valid_list)
2116 	{
2117 		RI_ConstraintInfo *riinfo = dlist_container(RI_ConstraintInfo,
2118 													valid_link, iter.cur);
2119 
2120 		if (hashvalue == 0 || riinfo->oidHashValue == hashvalue)
2121 		{
2122 			riinfo->valid = false;
2123 			/* Remove invalidated entries from the list, too */
2124 			dlist_delete(iter.cur);
2125 			ri_constraint_cache_valid_count--;
2126 		}
2127 	}
2128 }
2129 
2130 
2131 /*
2132  * Prepare execution plan for a query to enforce an RI restriction
2133  */
2134 static SPIPlanPtr
ri_PlanCheck(const char * querystr,int nargs,Oid * argtypes,RI_QueryKey * qkey,Relation fk_rel,Relation pk_rel)2135 ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
2136 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
2137 {
2138 	SPIPlanPtr	qplan;
2139 	Relation	query_rel;
2140 	Oid			save_userid;
2141 	int			save_sec_context;
2142 
2143 	/*
2144 	 * Use the query type code to determine whether the query is run against
2145 	 * the PK or FK table; we'll do the check as that table's owner
2146 	 */
2147 	if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
2148 		query_rel = pk_rel;
2149 	else
2150 		query_rel = fk_rel;
2151 
2152 	/* Switch to proper UID to perform check as */
2153 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
2154 	SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
2155 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
2156 						   SECURITY_NOFORCE_RLS);
2157 
2158 	/* Create the plan */
2159 	qplan = SPI_prepare(querystr, nargs, argtypes);
2160 
2161 	if (qplan == NULL)
2162 		elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), querystr);
2163 
2164 	/* Restore UID and security context */
2165 	SetUserIdAndSecContext(save_userid, save_sec_context);
2166 
2167 	/* Save the plan */
2168 	SPI_keepplan(qplan);
2169 	ri_HashPreparedPlan(qkey, qplan);
2170 
2171 	return qplan;
2172 }
2173 
2174 /*
2175  * Perform a query to enforce an RI restriction
2176  */
2177 static bool
ri_PerformCheck(const RI_ConstraintInfo * riinfo,RI_QueryKey * qkey,SPIPlanPtr qplan,Relation fk_rel,Relation pk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot,bool detectNewRows,int expect_OK)2178 ri_PerformCheck(const RI_ConstraintInfo *riinfo,
2179 				RI_QueryKey *qkey, SPIPlanPtr qplan,
2180 				Relation fk_rel, Relation pk_rel,
2181 				TupleTableSlot *oldslot, TupleTableSlot *newslot,
2182 				bool detectNewRows, int expect_OK)
2183 {
2184 	Relation	query_rel,
2185 				source_rel;
2186 	bool		source_is_pk;
2187 	Snapshot	test_snapshot;
2188 	Snapshot	crosscheck_snapshot;
2189 	int			limit;
2190 	int			spi_result;
2191 	Oid			save_userid;
2192 	int			save_sec_context;
2193 	Datum		vals[RI_MAX_NUMKEYS * 2];
2194 	char		nulls[RI_MAX_NUMKEYS * 2];
2195 
2196 	/*
2197 	 * Use the query type code to determine whether the query is run against
2198 	 * the PK or FK table; we'll do the check as that table's owner
2199 	 */
2200 	if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
2201 		query_rel = pk_rel;
2202 	else
2203 		query_rel = fk_rel;
2204 
2205 	/*
2206 	 * The values for the query are taken from the table on which the trigger
2207 	 * is called - it is normally the other one with respect to query_rel. An
2208 	 * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
2209 	 * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK).  We might eventually
2210 	 * need some less klugy way to determine this.
2211 	 */
2212 	if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
2213 	{
2214 		source_rel = fk_rel;
2215 		source_is_pk = false;
2216 	}
2217 	else
2218 	{
2219 		source_rel = pk_rel;
2220 		source_is_pk = true;
2221 	}
2222 
2223 	/* Extract the parameters to be passed into the query */
2224 	if (newslot)
2225 	{
2226 		ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
2227 						 vals, nulls);
2228 		if (oldslot)
2229 			ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
2230 							 vals + riinfo->nkeys, nulls + riinfo->nkeys);
2231 	}
2232 	else
2233 	{
2234 		ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
2235 						 vals, nulls);
2236 	}
2237 
2238 	/*
2239 	 * In READ COMMITTED mode, we just need to use an up-to-date regular
2240 	 * snapshot, and we will see all rows that could be interesting. But in
2241 	 * transaction-snapshot mode, we can't change the transaction snapshot. If
2242 	 * the caller passes detectNewRows == false then it's okay to do the query
2243 	 * with the transaction snapshot; otherwise we use a current snapshot, and
2244 	 * tell the executor to error out if it finds any rows under the current
2245 	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
2246 	 * that SPI_execute_snapshot will register the snapshots, so we don't need
2247 	 * to bother here.
2248 	 */
2249 	if (IsolationUsesXactSnapshot() && detectNewRows)
2250 	{
2251 		CommandCounterIncrement();	/* be sure all my own work is visible */
2252 		test_snapshot = GetLatestSnapshot();
2253 		crosscheck_snapshot = GetTransactionSnapshot();
2254 	}
2255 	else
2256 	{
2257 		/* the default SPI behavior is okay */
2258 		test_snapshot = InvalidSnapshot;
2259 		crosscheck_snapshot = InvalidSnapshot;
2260 	}
2261 
2262 	/*
2263 	 * If this is a select query (e.g., for a 'no action' or 'restrict'
2264 	 * trigger), we only need to see if there is a single row in the table,
2265 	 * matching the key.  Otherwise, limit = 0 - because we want the query to
2266 	 * affect ALL the matching rows.
2267 	 */
2268 	limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
2269 
2270 	/* Switch to proper UID to perform check as */
2271 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
2272 	SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
2273 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
2274 						   SECURITY_NOFORCE_RLS);
2275 
2276 	/* Finally we can run the query. */
2277 	spi_result = SPI_execute_snapshot(qplan,
2278 									  vals, nulls,
2279 									  test_snapshot, crosscheck_snapshot,
2280 									  false, false, limit);
2281 
2282 	/* Restore UID and security context */
2283 	SetUserIdAndSecContext(save_userid, save_sec_context);
2284 
2285 	/* Check result */
2286 	if (spi_result < 0)
2287 		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
2288 
2289 	if (expect_OK >= 0 && spi_result != expect_OK)
2290 		ereport(ERROR,
2291 				(errcode(ERRCODE_INTERNAL_ERROR),
2292 				 errmsg("referential integrity query on \"%s\" from constraint \"%s\" on \"%s\" gave unexpected result",
2293 						RelationGetRelationName(pk_rel),
2294 						NameStr(riinfo->conname),
2295 						RelationGetRelationName(fk_rel)),
2296 				 errhint("This is most likely due to a rule having rewritten the query.")));
2297 
2298 	/* XXX wouldn't it be clearer to do this part at the caller? */
2299 	if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
2300 		expect_OK == SPI_OK_SELECT &&
2301 		(SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
2302 		ri_ReportViolation(riinfo,
2303 						   pk_rel, fk_rel,
2304 						   newslot ? newslot : oldslot,
2305 						   NULL,
2306 						   qkey->constr_queryno, false);
2307 
2308 	return SPI_processed != 0;
2309 }
2310 
2311 /*
2312  * Extract fields from a tuple into Datum/nulls arrays
2313  */
2314 static void
ri_ExtractValues(Relation rel,TupleTableSlot * slot,const RI_ConstraintInfo * riinfo,bool rel_is_pk,Datum * vals,char * nulls)2315 ri_ExtractValues(Relation rel, TupleTableSlot *slot,
2316 				 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
2317 				 Datum *vals, char *nulls)
2318 {
2319 	const int16 *attnums;
2320 	bool		isnull;
2321 
2322 	if (rel_is_pk)
2323 		attnums = riinfo->pk_attnums;
2324 	else
2325 		attnums = riinfo->fk_attnums;
2326 
2327 	for (int i = 0; i < riinfo->nkeys; i++)
2328 	{
2329 		vals[i] = slot_getattr(slot, attnums[i], &isnull);
2330 		nulls[i] = isnull ? 'n' : ' ';
2331 	}
2332 }
2333 
2334 /*
2335  * Produce an error report
2336  *
2337  * If the failed constraint was on insert/update to the FK table,
2338  * we want the key names and values extracted from there, and the error
2339  * message to look like 'key blah is not present in PK'.
2340  * Otherwise, the attr names and values come from the PK table and the
2341  * message looks like 'key blah is still referenced from FK'.
2342  */
2343 static void
ri_ReportViolation(const RI_ConstraintInfo * riinfo,Relation pk_rel,Relation fk_rel,TupleTableSlot * violatorslot,TupleDesc tupdesc,int queryno,bool partgone)2344 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
2345 				   Relation pk_rel, Relation fk_rel,
2346 				   TupleTableSlot *violatorslot, TupleDesc tupdesc,
2347 				   int queryno, bool partgone)
2348 {
2349 	StringInfoData key_names;
2350 	StringInfoData key_values;
2351 	bool		onfk;
2352 	const int16 *attnums;
2353 	Oid			rel_oid;
2354 	AclResult	aclresult;
2355 	bool		has_perm = true;
2356 
2357 	/*
2358 	 * Determine which relation to complain about.  If tupdesc wasn't passed
2359 	 * by caller, assume the violator tuple came from there.
2360 	 */
2361 	onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
2362 	if (onfk)
2363 	{
2364 		attnums = riinfo->fk_attnums;
2365 		rel_oid = fk_rel->rd_id;
2366 		if (tupdesc == NULL)
2367 			tupdesc = fk_rel->rd_att;
2368 	}
2369 	else
2370 	{
2371 		attnums = riinfo->pk_attnums;
2372 		rel_oid = pk_rel->rd_id;
2373 		if (tupdesc == NULL)
2374 			tupdesc = pk_rel->rd_att;
2375 	}
2376 
2377 	/*
2378 	 * Check permissions- if the user does not have access to view the data in
2379 	 * any of the key columns then we don't include the errdetail() below.
2380 	 *
2381 	 * Check if RLS is enabled on the relation first.  If so, we don't return
2382 	 * any specifics to avoid leaking data.
2383 	 *
2384 	 * Check table-level permissions next and, failing that, column-level
2385 	 * privileges.
2386 	 *
2387 	 * When a partition at the referenced side is being detached/dropped, we
2388 	 * needn't check, since the user must be the table owner anyway.
2389 	 */
2390 	if (partgone)
2391 		has_perm = true;
2392 	else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
2393 	{
2394 		aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
2395 		if (aclresult != ACLCHECK_OK)
2396 		{
2397 			/* Try for column-level permissions */
2398 			for (int idx = 0; idx < riinfo->nkeys; idx++)
2399 			{
2400 				aclresult = pg_attribute_aclcheck(rel_oid, attnums[idx],
2401 												  GetUserId(),
2402 												  ACL_SELECT);
2403 
2404 				/* No access to the key */
2405 				if (aclresult != ACLCHECK_OK)
2406 				{
2407 					has_perm = false;
2408 					break;
2409 				}
2410 			}
2411 		}
2412 	}
2413 	else
2414 		has_perm = false;
2415 
2416 	if (has_perm)
2417 	{
2418 		/* Get printable versions of the keys involved */
2419 		initStringInfo(&key_names);
2420 		initStringInfo(&key_values);
2421 		for (int idx = 0; idx < riinfo->nkeys; idx++)
2422 		{
2423 			int			fnum = attnums[idx];
2424 			Form_pg_attribute att = TupleDescAttr(tupdesc, fnum - 1);
2425 			char	   *name,
2426 					   *val;
2427 			Datum		datum;
2428 			bool		isnull;
2429 
2430 			name = NameStr(att->attname);
2431 
2432 			datum = slot_getattr(violatorslot, fnum, &isnull);
2433 			if (!isnull)
2434 			{
2435 				Oid			foutoid;
2436 				bool		typisvarlena;
2437 
2438 				getTypeOutputInfo(att->atttypid, &foutoid, &typisvarlena);
2439 				val = OidOutputFunctionCall(foutoid, datum);
2440 			}
2441 			else
2442 				val = "null";
2443 
2444 			if (idx > 0)
2445 			{
2446 				appendStringInfoString(&key_names, ", ");
2447 				appendStringInfoString(&key_values, ", ");
2448 			}
2449 			appendStringInfoString(&key_names, name);
2450 			appendStringInfoString(&key_values, val);
2451 		}
2452 	}
2453 
2454 	if (partgone)
2455 		ereport(ERROR,
2456 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2457 				 errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
2458 						RelationGetRelationName(pk_rel),
2459 						NameStr(riinfo->conname)),
2460 				 errdetail("Key (%s)=(%s) is still referenced from table \"%s\".",
2461 						   key_names.data, key_values.data,
2462 						   RelationGetRelationName(fk_rel)),
2463 				 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2464 	else if (onfk)
2465 		ereport(ERROR,
2466 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2467 				 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
2468 						RelationGetRelationName(fk_rel),
2469 						NameStr(riinfo->conname)),
2470 				 has_perm ?
2471 				 errdetail("Key (%s)=(%s) is not present in table \"%s\".",
2472 						   key_names.data, key_values.data,
2473 						   RelationGetRelationName(pk_rel)) :
2474 				 errdetail("Key is not present in table \"%s\".",
2475 						   RelationGetRelationName(pk_rel)),
2476 				 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2477 	else
2478 		ereport(ERROR,
2479 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2480 				 errmsg("update or delete on table \"%s\" violates foreign key constraint \"%s\" on table \"%s\"",
2481 						RelationGetRelationName(pk_rel),
2482 						NameStr(riinfo->conname),
2483 						RelationGetRelationName(fk_rel)),
2484 				 has_perm ?
2485 				 errdetail("Key (%s)=(%s) is still referenced from table \"%s\".",
2486 						   key_names.data, key_values.data,
2487 						   RelationGetRelationName(fk_rel)) :
2488 				 errdetail("Key is still referenced from table \"%s\".",
2489 						   RelationGetRelationName(fk_rel)),
2490 				 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2491 }
2492 
2493 
2494 /*
2495  * ri_NullCheck -
2496  *
2497  * Determine the NULL state of all key values in a tuple
2498  *
2499  * Returns one of RI_KEYS_ALL_NULL, RI_KEYS_NONE_NULL or RI_KEYS_SOME_NULL.
2500  */
2501 static int
ri_NullCheck(TupleDesc tupDesc,TupleTableSlot * slot,const RI_ConstraintInfo * riinfo,bool rel_is_pk)2502 ri_NullCheck(TupleDesc tupDesc,
2503 			 TupleTableSlot *slot,
2504 			 const RI_ConstraintInfo *riinfo, bool rel_is_pk)
2505 {
2506 	const int16 *attnums;
2507 	bool		allnull = true;
2508 	bool		nonenull = true;
2509 
2510 	if (rel_is_pk)
2511 		attnums = riinfo->pk_attnums;
2512 	else
2513 		attnums = riinfo->fk_attnums;
2514 
2515 	for (int i = 0; i < riinfo->nkeys; i++)
2516 	{
2517 		if (slot_attisnull(slot, attnums[i]))
2518 			nonenull = false;
2519 		else
2520 			allnull = false;
2521 	}
2522 
2523 	if (allnull)
2524 		return RI_KEYS_ALL_NULL;
2525 
2526 	if (nonenull)
2527 		return RI_KEYS_NONE_NULL;
2528 
2529 	return RI_KEYS_SOME_NULL;
2530 }
2531 
2532 
2533 /*
2534  * ri_InitHashTables -
2535  *
2536  * Initialize our internal hash tables.
2537  */
2538 static void
ri_InitHashTables(void)2539 ri_InitHashTables(void)
2540 {
2541 	HASHCTL		ctl;
2542 
2543 	memset(&ctl, 0, sizeof(ctl));
2544 	ctl.keysize = sizeof(Oid);
2545 	ctl.entrysize = sizeof(RI_ConstraintInfo);
2546 	ri_constraint_cache = hash_create("RI constraint cache",
2547 									  RI_INIT_CONSTRAINTHASHSIZE,
2548 									  &ctl, HASH_ELEM | HASH_BLOBS);
2549 
2550 	/* Arrange to flush cache on pg_constraint changes */
2551 	CacheRegisterSyscacheCallback(CONSTROID,
2552 								  InvalidateConstraintCacheCallBack,
2553 								  (Datum) 0);
2554 
2555 	memset(&ctl, 0, sizeof(ctl));
2556 	ctl.keysize = sizeof(RI_QueryKey);
2557 	ctl.entrysize = sizeof(RI_QueryHashEntry);
2558 	ri_query_cache = hash_create("RI query cache",
2559 								 RI_INIT_QUERYHASHSIZE,
2560 								 &ctl, HASH_ELEM | HASH_BLOBS);
2561 
2562 	memset(&ctl, 0, sizeof(ctl));
2563 	ctl.keysize = sizeof(RI_CompareKey);
2564 	ctl.entrysize = sizeof(RI_CompareHashEntry);
2565 	ri_compare_cache = hash_create("RI compare cache",
2566 								   RI_INIT_QUERYHASHSIZE,
2567 								   &ctl, HASH_ELEM | HASH_BLOBS);
2568 }
2569 
2570 
2571 /*
2572  * ri_FetchPreparedPlan -
2573  *
2574  * Lookup for a query key in our private hash table of prepared
2575  * and saved SPI execution plans. Return the plan if found or NULL.
2576  */
2577 static SPIPlanPtr
ri_FetchPreparedPlan(RI_QueryKey * key)2578 ri_FetchPreparedPlan(RI_QueryKey *key)
2579 {
2580 	RI_QueryHashEntry *entry;
2581 	SPIPlanPtr	plan;
2582 
2583 	/*
2584 	 * On the first call initialize the hashtable
2585 	 */
2586 	if (!ri_query_cache)
2587 		ri_InitHashTables();
2588 
2589 	/*
2590 	 * Lookup for the key
2591 	 */
2592 	entry = (RI_QueryHashEntry *) hash_search(ri_query_cache,
2593 											  (void *) key,
2594 											  HASH_FIND, NULL);
2595 	if (entry == NULL)
2596 		return NULL;
2597 
2598 	/*
2599 	 * Check whether the plan is still valid.  If it isn't, we don't want to
2600 	 * simply rely on plancache.c to regenerate it; rather we should start
2601 	 * from scratch and rebuild the query text too.  This is to cover cases
2602 	 * such as table/column renames.  We depend on the plancache machinery to
2603 	 * detect possible invalidations, though.
2604 	 *
2605 	 * CAUTION: this check is only trustworthy if the caller has already
2606 	 * locked both FK and PK rels.
2607 	 */
2608 	plan = entry->plan;
2609 	if (plan && SPI_plan_is_valid(plan))
2610 		return plan;
2611 
2612 	/*
2613 	 * Otherwise we might as well flush the cached plan now, to free a little
2614 	 * memory space before we make a new one.
2615 	 */
2616 	entry->plan = NULL;
2617 	if (plan)
2618 		SPI_freeplan(plan);
2619 
2620 	return NULL;
2621 }
2622 
2623 
2624 /*
2625  * ri_HashPreparedPlan -
2626  *
2627  * Add another plan to our private SPI query plan hashtable.
2628  */
2629 static void
ri_HashPreparedPlan(RI_QueryKey * key,SPIPlanPtr plan)2630 ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan)
2631 {
2632 	RI_QueryHashEntry *entry;
2633 	bool		found;
2634 
2635 	/*
2636 	 * On the first call initialize the hashtable
2637 	 */
2638 	if (!ri_query_cache)
2639 		ri_InitHashTables();
2640 
2641 	/*
2642 	 * Add the new plan.  We might be overwriting an entry previously found
2643 	 * invalid by ri_FetchPreparedPlan.
2644 	 */
2645 	entry = (RI_QueryHashEntry *) hash_search(ri_query_cache,
2646 											  (void *) key,
2647 											  HASH_ENTER, &found);
2648 	Assert(!found || entry->plan == NULL);
2649 	entry->plan = plan;
2650 }
2651 
2652 
2653 /*
2654  * ri_KeysEqual -
2655  *
2656  * Check if all key values in OLD and NEW are equal.
2657  *
2658  * Note: at some point we might wish to redefine this as checking for
2659  * "IS NOT DISTINCT" rather than "=", that is, allow two nulls to be
2660  * considered equal.  Currently there is no need since all callers have
2661  * previously found at least one of the rows to contain no nulls.
2662  */
2663 static bool
ri_KeysEqual(Relation rel,TupleTableSlot * oldslot,TupleTableSlot * newslot,const RI_ConstraintInfo * riinfo,bool rel_is_pk)2664 ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
2665 			 const RI_ConstraintInfo *riinfo, bool rel_is_pk)
2666 {
2667 	const int16 *attnums;
2668 
2669 	if (rel_is_pk)
2670 		attnums = riinfo->pk_attnums;
2671 	else
2672 		attnums = riinfo->fk_attnums;
2673 
2674 	/* XXX: could be worthwhile to fetch all necessary attrs at once */
2675 	for (int i = 0; i < riinfo->nkeys; i++)
2676 	{
2677 		Datum		oldvalue;
2678 		Datum		newvalue;
2679 		bool		isnull;
2680 
2681 		/*
2682 		 * Get one attribute's oldvalue. If it is NULL - they're not equal.
2683 		 */
2684 		oldvalue = slot_getattr(oldslot, attnums[i], &isnull);
2685 		if (isnull)
2686 			return false;
2687 
2688 		/*
2689 		 * Get one attribute's newvalue. If it is NULL - they're not equal.
2690 		 */
2691 		newvalue = slot_getattr(newslot, attnums[i], &isnull);
2692 		if (isnull)
2693 			return false;
2694 
2695 		if (rel_is_pk)
2696 		{
2697 			/*
2698 			 * If we are looking at the PK table, then do a bytewise
2699 			 * comparison.  We must propagate PK changes if the value is
2700 			 * changed to one that "looks" different but would compare as
2701 			 * equal using the equality operator.  This only makes a
2702 			 * difference for ON UPDATE CASCADE, but for consistency we treat
2703 			 * all changes to the PK the same.
2704 			 */
2705 			Form_pg_attribute att = TupleDescAttr(oldslot->tts_tupleDescriptor, attnums[i] - 1);
2706 
2707 			if (!datum_image_eq(oldvalue, newvalue, att->attbyval, att->attlen))
2708 				return false;
2709 		}
2710 		else
2711 		{
2712 			/*
2713 			 * For the FK table, compare with the appropriate equality
2714 			 * operator.  Changes that compare equal will still satisfy the
2715 			 * constraint after the update.
2716 			 */
2717 			if (!ri_AttributesEqual(riinfo->ff_eq_oprs[i], RIAttType(rel, attnums[i]),
2718 									oldvalue, newvalue))
2719 				return false;
2720 		}
2721 	}
2722 
2723 	return true;
2724 }
2725 
2726 
2727 /*
2728  * ri_AttributesEqual -
2729  *
2730  * Call the appropriate equality comparison operator for two values.
2731  *
2732  * NB: we have already checked that neither value is null.
2733  */
2734 static bool
ri_AttributesEqual(Oid eq_opr,Oid typeid,Datum oldvalue,Datum newvalue)2735 ri_AttributesEqual(Oid eq_opr, Oid typeid,
2736 				   Datum oldvalue, Datum newvalue)
2737 {
2738 	RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
2739 
2740 	/* Do we need to cast the values? */
2741 	if (OidIsValid(entry->cast_func_finfo.fn_oid))
2742 	{
2743 		oldvalue = FunctionCall3(&entry->cast_func_finfo,
2744 								 oldvalue,
2745 								 Int32GetDatum(-1), /* typmod */
2746 								 BoolGetDatum(false));	/* implicit coercion */
2747 		newvalue = FunctionCall3(&entry->cast_func_finfo,
2748 								 newvalue,
2749 								 Int32GetDatum(-1), /* typmod */
2750 								 BoolGetDatum(false));	/* implicit coercion */
2751 	}
2752 
2753 	/*
2754 	 * Apply the comparison operator.
2755 	 *
2756 	 * Note: This function is part of a call stack that determines whether an
2757 	 * update to a row is significant enough that it needs checking or action
2758 	 * on the other side of a foreign-key constraint.  Therefore, the
2759 	 * comparison here would need to be done with the collation of the *other*
2760 	 * table.  For simplicity (e.g., we might not even have the other table
2761 	 * open), we'll just use the default collation here, which could lead to
2762 	 * some false negatives.  All this would break if we ever allow
2763 	 * database-wide collations to be nondeterministic.
2764 	 */
2765 	return DatumGetBool(FunctionCall2Coll(&entry->eq_opr_finfo,
2766 										  DEFAULT_COLLATION_OID,
2767 										  oldvalue, newvalue));
2768 }
2769 
2770 /*
2771  * ri_HashCompareOp -
2772  *
2773  * See if we know how to compare two values, and create a new hash entry
2774  * if not.
2775  */
2776 static RI_CompareHashEntry *
ri_HashCompareOp(Oid eq_opr,Oid typeid)2777 ri_HashCompareOp(Oid eq_opr, Oid typeid)
2778 {
2779 	RI_CompareKey key;
2780 	RI_CompareHashEntry *entry;
2781 	bool		found;
2782 
2783 	/*
2784 	 * On the first call initialize the hashtable
2785 	 */
2786 	if (!ri_compare_cache)
2787 		ri_InitHashTables();
2788 
2789 	/*
2790 	 * Find or create a hash entry.  Note we're assuming RI_CompareKey
2791 	 * contains no struct padding.
2792 	 */
2793 	key.eq_opr = eq_opr;
2794 	key.typeid = typeid;
2795 	entry = (RI_CompareHashEntry *) hash_search(ri_compare_cache,
2796 												(void *) &key,
2797 												HASH_ENTER, &found);
2798 	if (!found)
2799 		entry->valid = false;
2800 
2801 	/*
2802 	 * If not already initialized, do so.  Since we'll keep this hash entry
2803 	 * for the life of the backend, put any subsidiary info for the function
2804 	 * cache structs into TopMemoryContext.
2805 	 */
2806 	if (!entry->valid)
2807 	{
2808 		Oid			lefttype,
2809 					righttype,
2810 					castfunc;
2811 		CoercionPathType pathtype;
2812 
2813 		/* We always need to know how to call the equality operator */
2814 		fmgr_info_cxt(get_opcode(eq_opr), &entry->eq_opr_finfo,
2815 					  TopMemoryContext);
2816 
2817 		/*
2818 		 * If we chose to use a cast from FK to PK type, we may have to apply
2819 		 * the cast function to get to the operator's input type.
2820 		 *
2821 		 * XXX eventually it would be good to support array-coercion cases
2822 		 * here and in ri_AttributesEqual().  At the moment there is no point
2823 		 * because cases involving nonidentical array types will be rejected
2824 		 * at constraint creation time.
2825 		 *
2826 		 * XXX perhaps also consider supporting CoerceViaIO?  No need at the
2827 		 * moment since that will never be generated for implicit coercions.
2828 		 */
2829 		op_input_types(eq_opr, &lefttype, &righttype);
2830 		Assert(lefttype == righttype);
2831 		if (typeid == lefttype)
2832 			castfunc = InvalidOid;	/* simplest case */
2833 		else
2834 		{
2835 			pathtype = find_coercion_pathway(lefttype, typeid,
2836 											 COERCION_IMPLICIT,
2837 											 &castfunc);
2838 			if (pathtype != COERCION_PATH_FUNC &&
2839 				pathtype != COERCION_PATH_RELABELTYPE)
2840 			{
2841 				/*
2842 				 * The declared input type of the eq_opr might be a
2843 				 * polymorphic type such as ANYARRAY or ANYENUM, or other
2844 				 * special cases such as RECORD; find_coercion_pathway
2845 				 * currently doesn't subsume these special cases.
2846 				 */
2847 				if (!IsBinaryCoercible(typeid, lefttype))
2848 					elog(ERROR, "no conversion function from %s to %s",
2849 						 format_type_be(typeid),
2850 						 format_type_be(lefttype));
2851 			}
2852 		}
2853 		if (OidIsValid(castfunc))
2854 			fmgr_info_cxt(castfunc, &entry->cast_func_finfo,
2855 						  TopMemoryContext);
2856 		else
2857 			entry->cast_func_finfo.fn_oid = InvalidOid;
2858 		entry->valid = true;
2859 	}
2860 
2861 	return entry;
2862 }
2863 
2864 
2865 /*
2866  * Given a trigger function OID, determine whether it is an RI trigger,
2867  * and if so whether it is attached to PK or FK relation.
2868  */
2869 int
RI_FKey_trigger_type(Oid tgfoid)2870 RI_FKey_trigger_type(Oid tgfoid)
2871 {
2872 	switch (tgfoid)
2873 	{
2874 		case F_RI_FKEY_CASCADE_DEL:
2875 		case F_RI_FKEY_CASCADE_UPD:
2876 		case F_RI_FKEY_RESTRICT_DEL:
2877 		case F_RI_FKEY_RESTRICT_UPD:
2878 		case F_RI_FKEY_SETNULL_DEL:
2879 		case F_RI_FKEY_SETNULL_UPD:
2880 		case F_RI_FKEY_SETDEFAULT_DEL:
2881 		case F_RI_FKEY_SETDEFAULT_UPD:
2882 		case F_RI_FKEY_NOACTION_DEL:
2883 		case F_RI_FKEY_NOACTION_UPD:
2884 			return RI_TRIGGER_PK;
2885 
2886 		case F_RI_FKEY_CHECK_INS:
2887 		case F_RI_FKEY_CHECK_UPD:
2888 			return RI_TRIGGER_FK;
2889 	}
2890 
2891 	return RI_TRIGGER_NONE;
2892 }
2893