1 /*-------------------------------------------------------------------------
2  *
3  * ri_triggers.c
4  *
5  *	Generic trigger procedures for referential integrity constraint
6  *	checks.
7  *
8  *	Note about memory management: the private hashtables kept here live
9  *	across query and transaction boundaries, in fact they live as long as
10  *	the backend does.  This works because the hashtable structures
11  *	themselves are allocated by dynahash.c in its permanent DynaHashCxt,
12  *	and the SPI plans they point to are saved using SPI_keepplan().
13  *	There is not currently any provision for throwing away a no-longer-needed
14  *	plan --- consider improving this someday.
15  *
16  *
17  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
18  *
19  * src/backend/utils/adt/ri_triggers.c
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "access/htup_details.h"
27 #include "access/sysattr.h"
28 #include "access/table.h"
29 #include "access/tableam.h"
30 #include "access/xact.h"
31 #include "catalog/pg_collation.h"
32 #include "catalog/pg_constraint.h"
33 #include "catalog/pg_operator.h"
34 #include "catalog/pg_type.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
37 #include "executor/spi.h"
38 #include "lib/ilist.h"
39 #include "miscadmin.h"
40 #include "parser/parse_coerce.h"
41 #include "parser/parse_relation.h"
42 #include "storage/bufmgr.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/datum.h"
46 #include "utils/fmgroids.h"
47 #include "utils/guc.h"
48 #include "utils/inval.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/rel.h"
52 #include "utils/rls.h"
53 #include "utils/ruleutils.h"
54 #include "utils/snapmgr.h"
55 #include "utils/syscache.h"
56 
57 /*
58  * Local definitions
59  */
60 
61 #define RI_MAX_NUMKEYS					INDEX_MAX_KEYS
62 
63 #define RI_INIT_CONSTRAINTHASHSIZE		64
64 #define RI_INIT_QUERYHASHSIZE			(RI_INIT_CONSTRAINTHASHSIZE * 4)
65 
66 #define RI_KEYS_ALL_NULL				0
67 #define RI_KEYS_SOME_NULL				1
68 #define RI_KEYS_NONE_NULL				2
69 
70 /* RI query type codes */
71 /* these queries are executed against the PK (referenced) table: */
72 #define RI_PLAN_CHECK_LOOKUPPK			1
73 #define RI_PLAN_CHECK_LOOKUPPK_FROM_PK	2
74 #define RI_PLAN_LAST_ON_PK				RI_PLAN_CHECK_LOOKUPPK_FROM_PK
75 /* these queries are executed against the FK (referencing) table: */
76 #define RI_PLAN_CASCADE_DEL_DODELETE	3
77 #define RI_PLAN_CASCADE_UPD_DOUPDATE	4
78 #define RI_PLAN_RESTRICT_CHECKREF		5
79 #define RI_PLAN_SETNULL_DOUPDATE		6
80 #define RI_PLAN_SETDEFAULT_DOUPDATE		7
81 
82 #define MAX_QUOTED_NAME_LEN  (NAMEDATALEN*2+3)
83 #define MAX_QUOTED_REL_NAME_LEN  (MAX_QUOTED_NAME_LEN*2)
84 
85 #define RIAttName(rel, attnum)	NameStr(*attnumAttName(rel, attnum))
86 #define RIAttType(rel, attnum)	attnumTypeId(rel, attnum)
87 #define RIAttCollation(rel, attnum) attnumCollationId(rel, attnum)
88 
89 #define RI_TRIGTYPE_INSERT 1
90 #define RI_TRIGTYPE_UPDATE 2
91 #define RI_TRIGTYPE_DELETE 3
92 
93 
94 /*
95  * RI_ConstraintInfo
96  *
97  * Information extracted from an FK pg_constraint entry.  This is cached in
98  * ri_constraint_cache.
99  */
100 typedef struct RI_ConstraintInfo
101 {
102 	Oid			constraint_id;	/* OID of pg_constraint entry (hash key) */
103 	bool		valid;			/* successfully initialized? */
104 	Oid			constraint_root_id; /* OID of topmost ancestor constraint;
105 									 * same as constraint_id if not inherited */
106 	uint32		oidHashValue;	/* hash value of constraint_id */
107 	uint32		rootHashValue;	/* hash value of constraint_root_id */
108 	NameData	conname;		/* name of the FK constraint */
109 	Oid			pk_relid;		/* referenced relation */
110 	Oid			fk_relid;		/* referencing relation */
111 	char		confupdtype;	/* foreign key's ON UPDATE action */
112 	char		confdeltype;	/* foreign key's ON DELETE action */
113 	char		confmatchtype;	/* foreign key's match type */
114 	int			nkeys;			/* number of key columns */
115 	int16		pk_attnums[RI_MAX_NUMKEYS]; /* attnums of referenced cols */
116 	int16		fk_attnums[RI_MAX_NUMKEYS]; /* attnums of referencing cols */
117 	Oid			pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = FK) */
118 	Oid			pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = PK) */
119 	Oid			ff_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (FK = FK) */
120 	dlist_node	valid_link;		/* Link in list of valid entries */
121 } RI_ConstraintInfo;
122 
123 /*
124  * RI_QueryKey
125  *
126  * The key identifying a prepared SPI plan in our query hashtable
127  */
128 typedef struct RI_QueryKey
129 {
130 	Oid			constr_id;		/* OID of pg_constraint entry */
131 	int32		constr_queryno; /* query type ID, see RI_PLAN_XXX above */
132 } RI_QueryKey;
133 
134 /*
135  * RI_QueryHashEntry
136  */
137 typedef struct RI_QueryHashEntry
138 {
139 	RI_QueryKey key;
140 	SPIPlanPtr	plan;
141 } RI_QueryHashEntry;
142 
143 /*
144  * RI_CompareKey
145  *
146  * The key identifying an entry showing how to compare two values
147  */
148 typedef struct RI_CompareKey
149 {
150 	Oid			eq_opr;			/* the equality operator to apply */
151 	Oid			typeid;			/* the data type to apply it to */
152 } RI_CompareKey;
153 
154 /*
155  * RI_CompareHashEntry
156  */
157 typedef struct RI_CompareHashEntry
158 {
159 	RI_CompareKey key;
160 	bool		valid;			/* successfully initialized? */
161 	FmgrInfo	eq_opr_finfo;	/* call info for equality fn */
162 	FmgrInfo	cast_func_finfo;	/* in case we must coerce input */
163 } RI_CompareHashEntry;
164 
165 
166 /*
167  * Local data
168  */
169 static HTAB *ri_constraint_cache = NULL;
170 static HTAB *ri_query_cache = NULL;
171 static HTAB *ri_compare_cache = NULL;
172 static dlist_head ri_constraint_cache_valid_list;
173 static int	ri_constraint_cache_valid_count = 0;
174 
175 
176 /*
177  * Local function prototypes
178  */
179 static bool ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
180 							  TupleTableSlot *oldslot,
181 							  const RI_ConstraintInfo *riinfo);
182 static Datum ri_restrict(TriggerData *trigdata, bool is_no_action);
183 static Datum ri_set(TriggerData *trigdata, bool is_set_null);
184 static void quoteOneName(char *buffer, const char *name);
185 static void quoteRelationName(char *buffer, Relation rel);
186 static void ri_GenerateQual(StringInfo buf,
187 							const char *sep,
188 							const char *leftop, Oid leftoptype,
189 							Oid opoid,
190 							const char *rightop, Oid rightoptype);
191 static void ri_GenerateQualCollation(StringInfo buf, Oid collation);
192 static int	ri_NullCheck(TupleDesc tupdesc, TupleTableSlot *slot,
193 						 const RI_ConstraintInfo *riinfo, bool rel_is_pk);
194 static void ri_BuildQueryKey(RI_QueryKey *key,
195 							 const RI_ConstraintInfo *riinfo,
196 							 int32 constr_queryno);
197 static bool ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
198 						 const RI_ConstraintInfo *riinfo, bool rel_is_pk);
199 static bool ri_AttributesEqual(Oid eq_opr, Oid typeid,
200 							   Datum oldvalue, Datum newvalue);
201 
202 static void ri_InitHashTables(void);
203 static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue);
204 static SPIPlanPtr ri_FetchPreparedPlan(RI_QueryKey *key);
205 static void ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan);
206 static RI_CompareHashEntry *ri_HashCompareOp(Oid eq_opr, Oid typeid);
207 
208 static void ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname,
209 							int tgkind);
210 static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
211 													   Relation trig_rel, bool rel_is_pk);
212 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
213 static Oid	get_ri_constraint_root(Oid constrOid);
214 static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
215 							   RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
216 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
217 							RI_QueryKey *qkey, SPIPlanPtr qplan,
218 							Relation fk_rel, Relation pk_rel,
219 							TupleTableSlot *oldslot, TupleTableSlot *newslot,
220 							bool detectNewRows, int expect_OK);
221 static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
222 							 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
223 							 Datum *vals, char *nulls);
224 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
225 							   Relation pk_rel, Relation fk_rel,
226 							   TupleTableSlot *violatorslot, TupleDesc tupdesc,
227 							   int queryno, bool partgone) pg_attribute_noreturn();
228 
229 
230 /*
231  * RI_FKey_check -
232  *
233  * Check foreign key existence (combined for INSERT and UPDATE).
234  */
235 static Datum
RI_FKey_check(TriggerData * trigdata)236 RI_FKey_check(TriggerData *trigdata)
237 {
238 	const RI_ConstraintInfo *riinfo;
239 	Relation	fk_rel;
240 	Relation	pk_rel;
241 	TupleTableSlot *newslot;
242 	RI_QueryKey qkey;
243 	SPIPlanPtr	qplan;
244 
245 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
246 									trigdata->tg_relation, false);
247 
248 	if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
249 		newslot = trigdata->tg_newslot;
250 	else
251 		newslot = trigdata->tg_trigslot;
252 
253 	/*
254 	 * We should not even consider checking the row if it is no longer valid,
255 	 * since it was either deleted (so the deferred check should be skipped)
256 	 * or updated (in which case only the latest version of the row should be
257 	 * checked).  Test its liveness according to SnapshotSelf.  We need pin
258 	 * and lock on the buffer to call HeapTupleSatisfiesVisibility.  Caller
259 	 * should be holding pin, but not lock.
260 	 */
261 	if (!table_tuple_satisfies_snapshot(trigdata->tg_relation, newslot, SnapshotSelf))
262 		return PointerGetDatum(NULL);
263 
264 	/*
265 	 * Get the relation descriptors of the FK and PK tables.
266 	 *
267 	 * pk_rel is opened in RowShareLock mode since that's what our eventual
268 	 * SELECT FOR KEY SHARE will get on it.
269 	 */
270 	fk_rel = trigdata->tg_relation;
271 	pk_rel = table_open(riinfo->pk_relid, RowShareLock);
272 
273 	switch (ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false))
274 	{
275 		case RI_KEYS_ALL_NULL:
276 
277 			/*
278 			 * No further check needed - an all-NULL key passes every type of
279 			 * foreign key constraint.
280 			 */
281 			table_close(pk_rel, RowShareLock);
282 			return PointerGetDatum(NULL);
283 
284 		case RI_KEYS_SOME_NULL:
285 
286 			/*
287 			 * This is the only case that differs between the three kinds of
288 			 * MATCH.
289 			 */
290 			switch (riinfo->confmatchtype)
291 			{
292 				case FKCONSTR_MATCH_FULL:
293 
294 					/*
295 					 * Not allowed - MATCH FULL says either all or none of the
296 					 * attributes can be NULLs
297 					 */
298 					ereport(ERROR,
299 							(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
300 							 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
301 									RelationGetRelationName(fk_rel),
302 									NameStr(riinfo->conname)),
303 							 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
304 							 errtableconstraint(fk_rel,
305 												NameStr(riinfo->conname))));
306 					table_close(pk_rel, RowShareLock);
307 					return PointerGetDatum(NULL);
308 
309 				case FKCONSTR_MATCH_SIMPLE:
310 
311 					/*
312 					 * MATCH SIMPLE - if ANY column is null, the key passes
313 					 * the constraint.
314 					 */
315 					table_close(pk_rel, RowShareLock);
316 					return PointerGetDatum(NULL);
317 
318 #ifdef NOT_USED
319 				case FKCONSTR_MATCH_PARTIAL:
320 
321 					/*
322 					 * MATCH PARTIAL - all non-null columns must match. (not
323 					 * implemented, can be done by modifying the query below
324 					 * to only include non-null columns, or by writing a
325 					 * special version here)
326 					 */
327 					break;
328 #endif
329 			}
330 
331 		case RI_KEYS_NONE_NULL:
332 
333 			/*
334 			 * Have a full qualified key - continue below for all three kinds
335 			 * of MATCH.
336 			 */
337 			break;
338 	}
339 
340 	if (SPI_connect() != SPI_OK_CONNECT)
341 		elog(ERROR, "SPI_connect failed");
342 
343 	/* Fetch or prepare a saved plan for the real check */
344 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
345 
346 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
347 	{
348 		StringInfoData querybuf;
349 		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
350 		char		attname[MAX_QUOTED_NAME_LEN];
351 		char		paramname[16];
352 		const char *querysep;
353 		Oid			queryoids[RI_MAX_NUMKEYS];
354 		const char *pk_only;
355 
356 		/* ----------
357 		 * The query string built is
358 		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
359 		 *		   FOR KEY SHARE OF x
360 		 * The type id's for the $ parameters are those of the
361 		 * corresponding FK attributes.
362 		 * ----------
363 		 */
364 		initStringInfo(&querybuf);
365 		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
366 			"" : "ONLY ";
367 		quoteRelationName(pkrelname, pk_rel);
368 		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
369 						 pk_only, pkrelname);
370 		querysep = "WHERE";
371 		for (int i = 0; i < riinfo->nkeys; i++)
372 		{
373 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
374 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
375 
376 			quoteOneName(attname,
377 						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
378 			sprintf(paramname, "$%d", i + 1);
379 			ri_GenerateQual(&querybuf, querysep,
380 							attname, pk_type,
381 							riinfo->pf_eq_oprs[i],
382 							paramname, fk_type);
383 			querysep = "AND";
384 			queryoids[i] = fk_type;
385 		}
386 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
387 
388 		/* Prepare and save the plan */
389 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
390 							 &qkey, fk_rel, pk_rel);
391 	}
392 
393 	/*
394 	 * Now check that foreign key exists in PK table
395 	 *
396 	 * XXX detectNewRows must be true when a partitioned table is on the
397 	 * referenced side.  The reason is that our snapshot must be fresh in
398 	 * order for the hack in find_inheritance_children() to work.
399 	 */
400 	ri_PerformCheck(riinfo, &qkey, qplan,
401 					fk_rel, pk_rel,
402 					NULL, newslot,
403 					pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
404 					SPI_OK_SELECT);
405 
406 	if (SPI_finish() != SPI_OK_FINISH)
407 		elog(ERROR, "SPI_finish failed");
408 
409 	table_close(pk_rel, RowShareLock);
410 
411 	return PointerGetDatum(NULL);
412 }
413 
414 
415 /*
416  * RI_FKey_check_ins -
417  *
418  * Check foreign key existence at insert event on FK table.
419  */
420 Datum
RI_FKey_check_ins(PG_FUNCTION_ARGS)421 RI_FKey_check_ins(PG_FUNCTION_ARGS)
422 {
423 	/* Check that this is a valid trigger call on the right time and event. */
424 	ri_CheckTrigger(fcinfo, "RI_FKey_check_ins", RI_TRIGTYPE_INSERT);
425 
426 	/* Share code with UPDATE case. */
427 	return RI_FKey_check((TriggerData *) fcinfo->context);
428 }
429 
430 
431 /*
432  * RI_FKey_check_upd -
433  *
434  * Check foreign key existence at update event on FK table.
435  */
436 Datum
RI_FKey_check_upd(PG_FUNCTION_ARGS)437 RI_FKey_check_upd(PG_FUNCTION_ARGS)
438 {
439 	/* Check that this is a valid trigger call on the right time and event. */
440 	ri_CheckTrigger(fcinfo, "RI_FKey_check_upd", RI_TRIGTYPE_UPDATE);
441 
442 	/* Share code with INSERT case. */
443 	return RI_FKey_check((TriggerData *) fcinfo->context);
444 }
445 
446 
447 /*
448  * ri_Check_Pk_Match
449  *
450  * Check to see if another PK row has been created that provides the same
451  * key values as the "oldslot" that's been modified or deleted in our trigger
452  * event.  Returns true if a match is found in the PK table.
453  *
454  * We assume the caller checked that the oldslot contains no NULL key values,
455  * since otherwise a match is impossible.
456  */
457 static bool
ri_Check_Pk_Match(Relation pk_rel,Relation fk_rel,TupleTableSlot * oldslot,const RI_ConstraintInfo * riinfo)458 ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
459 				  TupleTableSlot *oldslot,
460 				  const RI_ConstraintInfo *riinfo)
461 {
462 	SPIPlanPtr	qplan;
463 	RI_QueryKey qkey;
464 	bool		result;
465 
466 	/* Only called for non-null rows */
467 	Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
468 
469 	if (SPI_connect() != SPI_OK_CONNECT)
470 		elog(ERROR, "SPI_connect failed");
471 
472 	/*
473 	 * Fetch or prepare a saved plan for checking PK table with values coming
474 	 * from a PK row
475 	 */
476 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
477 
478 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
479 	{
480 		StringInfoData querybuf;
481 		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
482 		char		attname[MAX_QUOTED_NAME_LEN];
483 		char		paramname[16];
484 		const char *querysep;
485 		const char *pk_only;
486 		Oid			queryoids[RI_MAX_NUMKEYS];
487 
488 		/* ----------
489 		 * The query string built is
490 		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
491 		 *		   FOR KEY SHARE OF x
492 		 * The type id's for the $ parameters are those of the
493 		 * PK attributes themselves.
494 		 * ----------
495 		 */
496 		initStringInfo(&querybuf);
497 		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
498 			"" : "ONLY ";
499 		quoteRelationName(pkrelname, pk_rel);
500 		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
501 						 pk_only, pkrelname);
502 		querysep = "WHERE";
503 		for (int i = 0; i < riinfo->nkeys; i++)
504 		{
505 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
506 
507 			quoteOneName(attname,
508 						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
509 			sprintf(paramname, "$%d", i + 1);
510 			ri_GenerateQual(&querybuf, querysep,
511 							attname, pk_type,
512 							riinfo->pp_eq_oprs[i],
513 							paramname, pk_type);
514 			querysep = "AND";
515 			queryoids[i] = pk_type;
516 		}
517 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
518 
519 		/* Prepare and save the plan */
520 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
521 							 &qkey, fk_rel, pk_rel);
522 	}
523 
524 	/*
525 	 * We have a plan now. Run it.
526 	 */
527 	result = ri_PerformCheck(riinfo, &qkey, qplan,
528 							 fk_rel, pk_rel,
529 							 oldslot, NULL,
530 							 true,	/* treat like update */
531 							 SPI_OK_SELECT);
532 
533 	if (SPI_finish() != SPI_OK_FINISH)
534 		elog(ERROR, "SPI_finish failed");
535 
536 	return result;
537 }
538 
539 
540 /*
541  * RI_FKey_noaction_del -
542  *
543  * Give an error and roll back the current transaction if the
544  * delete has resulted in a violation of the given referential
545  * integrity constraint.
546  */
547 Datum
RI_FKey_noaction_del(PG_FUNCTION_ARGS)548 RI_FKey_noaction_del(PG_FUNCTION_ARGS)
549 {
550 	/* Check that this is a valid trigger call on the right time and event. */
551 	ri_CheckTrigger(fcinfo, "RI_FKey_noaction_del", RI_TRIGTYPE_DELETE);
552 
553 	/* Share code with RESTRICT/UPDATE cases. */
554 	return ri_restrict((TriggerData *) fcinfo->context, true);
555 }
556 
557 /*
558  * RI_FKey_restrict_del -
559  *
560  * Restrict delete from PK table to rows unreferenced by foreign key.
561  *
562  * The SQL standard intends that this referential action occur exactly when
563  * the delete is performed, rather than after.  This appears to be
564  * the only difference between "NO ACTION" and "RESTRICT".  In Postgres
565  * we still implement this as an AFTER trigger, but it's non-deferrable.
566  */
567 Datum
RI_FKey_restrict_del(PG_FUNCTION_ARGS)568 RI_FKey_restrict_del(PG_FUNCTION_ARGS)
569 {
570 	/* Check that this is a valid trigger call on the right time and event. */
571 	ri_CheckTrigger(fcinfo, "RI_FKey_restrict_del", RI_TRIGTYPE_DELETE);
572 
573 	/* Share code with NO ACTION/UPDATE cases. */
574 	return ri_restrict((TriggerData *) fcinfo->context, false);
575 }
576 
577 /*
578  * RI_FKey_noaction_upd -
579  *
580  * Give an error and roll back the current transaction if the
581  * update has resulted in a violation of the given referential
582  * integrity constraint.
583  */
584 Datum
RI_FKey_noaction_upd(PG_FUNCTION_ARGS)585 RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
586 {
587 	/* Check that this is a valid trigger call on the right time and event. */
588 	ri_CheckTrigger(fcinfo, "RI_FKey_noaction_upd", RI_TRIGTYPE_UPDATE);
589 
590 	/* Share code with RESTRICT/DELETE cases. */
591 	return ri_restrict((TriggerData *) fcinfo->context, true);
592 }
593 
594 /*
595  * RI_FKey_restrict_upd -
596  *
597  * Restrict update of PK to rows unreferenced by foreign key.
598  *
599  * The SQL standard intends that this referential action occur exactly when
600  * the update is performed, rather than after.  This appears to be
601  * the only difference between "NO ACTION" and "RESTRICT".  In Postgres
602  * we still implement this as an AFTER trigger, but it's non-deferrable.
603  */
604 Datum
RI_FKey_restrict_upd(PG_FUNCTION_ARGS)605 RI_FKey_restrict_upd(PG_FUNCTION_ARGS)
606 {
607 	/* Check that this is a valid trigger call on the right time and event. */
608 	ri_CheckTrigger(fcinfo, "RI_FKey_restrict_upd", RI_TRIGTYPE_UPDATE);
609 
610 	/* Share code with NO ACTION/DELETE cases. */
611 	return ri_restrict((TriggerData *) fcinfo->context, false);
612 }
613 
614 /*
615  * ri_restrict -
616  *
617  * Common code for ON DELETE RESTRICT, ON DELETE NO ACTION,
618  * ON UPDATE RESTRICT, and ON UPDATE NO ACTION.
619  */
620 static Datum
ri_restrict(TriggerData * trigdata,bool is_no_action)621 ri_restrict(TriggerData *trigdata, bool is_no_action)
622 {
623 	const RI_ConstraintInfo *riinfo;
624 	Relation	fk_rel;
625 	Relation	pk_rel;
626 	TupleTableSlot *oldslot;
627 	RI_QueryKey qkey;
628 	SPIPlanPtr	qplan;
629 
630 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
631 									trigdata->tg_relation, true);
632 
633 	/*
634 	 * Get the relation descriptors of the FK and PK tables and the old tuple.
635 	 *
636 	 * fk_rel is opened in RowShareLock mode since that's what our eventual
637 	 * SELECT FOR KEY SHARE will get on it.
638 	 */
639 	fk_rel = table_open(riinfo->fk_relid, RowShareLock);
640 	pk_rel = trigdata->tg_relation;
641 	oldslot = trigdata->tg_trigslot;
642 
643 	/*
644 	 * If another PK row now exists providing the old key values, we should
645 	 * not do anything.  However, this check should only be made in the NO
646 	 * ACTION case; in RESTRICT cases we don't wish to allow another row to be
647 	 * substituted.
648 	 */
649 	if (is_no_action &&
650 		ri_Check_Pk_Match(pk_rel, fk_rel, oldslot, riinfo))
651 	{
652 		table_close(fk_rel, RowShareLock);
653 		return PointerGetDatum(NULL);
654 	}
655 
656 	if (SPI_connect() != SPI_OK_CONNECT)
657 		elog(ERROR, "SPI_connect failed");
658 
659 	/*
660 	 * Fetch or prepare a saved plan for the restrict lookup (it's the same
661 	 * query for delete and update cases)
662 	 */
663 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_RESTRICT_CHECKREF);
664 
665 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
666 	{
667 		StringInfoData querybuf;
668 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
669 		char		attname[MAX_QUOTED_NAME_LEN];
670 		char		paramname[16];
671 		const char *querysep;
672 		Oid			queryoids[RI_MAX_NUMKEYS];
673 		const char *fk_only;
674 
675 		/* ----------
676 		 * The query string built is
677 		 *	SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
678 		 *		   FOR KEY SHARE OF x
679 		 * The type id's for the $ parameters are those of the
680 		 * corresponding PK attributes.
681 		 * ----------
682 		 */
683 		initStringInfo(&querybuf);
684 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
685 			"" : "ONLY ";
686 		quoteRelationName(fkrelname, fk_rel);
687 		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
688 						 fk_only, fkrelname);
689 		querysep = "WHERE";
690 		for (int i = 0; i < riinfo->nkeys; i++)
691 		{
692 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
693 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
694 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
695 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
696 
697 			quoteOneName(attname,
698 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
699 			sprintf(paramname, "$%d", i + 1);
700 			ri_GenerateQual(&querybuf, querysep,
701 							paramname, pk_type,
702 							riinfo->pf_eq_oprs[i],
703 							attname, fk_type);
704 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
705 				ri_GenerateQualCollation(&querybuf, pk_coll);
706 			querysep = "AND";
707 			queryoids[i] = pk_type;
708 		}
709 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
710 
711 		/* Prepare and save the plan */
712 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
713 							 &qkey, fk_rel, pk_rel);
714 	}
715 
716 	/*
717 	 * We have a plan now. Run it to check for existing references.
718 	 */
719 	ri_PerformCheck(riinfo, &qkey, qplan,
720 					fk_rel, pk_rel,
721 					oldslot, NULL,
722 					true,		/* must detect new rows */
723 					SPI_OK_SELECT);
724 
725 	if (SPI_finish() != SPI_OK_FINISH)
726 		elog(ERROR, "SPI_finish failed");
727 
728 	table_close(fk_rel, RowShareLock);
729 
730 	return PointerGetDatum(NULL);
731 }
732 
733 
734 /*
735  * RI_FKey_cascade_del -
736  *
737  * Cascaded delete foreign key references at delete event on PK table.
738  */
739 Datum
RI_FKey_cascade_del(PG_FUNCTION_ARGS)740 RI_FKey_cascade_del(PG_FUNCTION_ARGS)
741 {
742 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
743 	const RI_ConstraintInfo *riinfo;
744 	Relation	fk_rel;
745 	Relation	pk_rel;
746 	TupleTableSlot *oldslot;
747 	RI_QueryKey qkey;
748 	SPIPlanPtr	qplan;
749 
750 	/* Check that this is a valid trigger call on the right time and event. */
751 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_del", RI_TRIGTYPE_DELETE);
752 
753 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
754 									trigdata->tg_relation, true);
755 
756 	/*
757 	 * Get the relation descriptors of the FK and PK tables and the old tuple.
758 	 *
759 	 * fk_rel is opened in RowExclusiveLock mode since that's what our
760 	 * eventual DELETE will get on it.
761 	 */
762 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
763 	pk_rel = trigdata->tg_relation;
764 	oldslot = trigdata->tg_trigslot;
765 
766 	if (SPI_connect() != SPI_OK_CONNECT)
767 		elog(ERROR, "SPI_connect failed");
768 
769 	/* Fetch or prepare a saved plan for the cascaded delete */
770 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_DEL_DODELETE);
771 
772 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
773 	{
774 		StringInfoData querybuf;
775 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
776 		char		attname[MAX_QUOTED_NAME_LEN];
777 		char		paramname[16];
778 		const char *querysep;
779 		Oid			queryoids[RI_MAX_NUMKEYS];
780 		const char *fk_only;
781 
782 		/* ----------
783 		 * The query string built is
784 		 *	DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
785 		 * The type id's for the $ parameters are those of the
786 		 * corresponding PK attributes.
787 		 * ----------
788 		 */
789 		initStringInfo(&querybuf);
790 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
791 			"" : "ONLY ";
792 		quoteRelationName(fkrelname, fk_rel);
793 		appendStringInfo(&querybuf, "DELETE FROM %s%s",
794 						 fk_only, fkrelname);
795 		querysep = "WHERE";
796 		for (int i = 0; i < riinfo->nkeys; i++)
797 		{
798 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
799 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
800 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
801 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
802 
803 			quoteOneName(attname,
804 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
805 			sprintf(paramname, "$%d", i + 1);
806 			ri_GenerateQual(&querybuf, querysep,
807 							paramname, pk_type,
808 							riinfo->pf_eq_oprs[i],
809 							attname, fk_type);
810 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
811 				ri_GenerateQualCollation(&querybuf, pk_coll);
812 			querysep = "AND";
813 			queryoids[i] = pk_type;
814 		}
815 
816 		/* Prepare and save the plan */
817 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
818 							 &qkey, fk_rel, pk_rel);
819 	}
820 
821 	/*
822 	 * We have a plan now. Build up the arguments from the key values in the
823 	 * deleted PK tuple and delete the referencing rows
824 	 */
825 	ri_PerformCheck(riinfo, &qkey, qplan,
826 					fk_rel, pk_rel,
827 					oldslot, NULL,
828 					true,		/* must detect new rows */
829 					SPI_OK_DELETE);
830 
831 	if (SPI_finish() != SPI_OK_FINISH)
832 		elog(ERROR, "SPI_finish failed");
833 
834 	table_close(fk_rel, RowExclusiveLock);
835 
836 	return PointerGetDatum(NULL);
837 }
838 
839 
840 /*
841  * RI_FKey_cascade_upd -
842  *
843  * Cascaded update foreign key references at update event on PK table.
844  */
845 Datum
RI_FKey_cascade_upd(PG_FUNCTION_ARGS)846 RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
847 {
848 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
849 	const RI_ConstraintInfo *riinfo;
850 	Relation	fk_rel;
851 	Relation	pk_rel;
852 	TupleTableSlot *newslot;
853 	TupleTableSlot *oldslot;
854 	RI_QueryKey qkey;
855 	SPIPlanPtr	qplan;
856 
857 	/* Check that this is a valid trigger call on the right time and event. */
858 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_upd", RI_TRIGTYPE_UPDATE);
859 
860 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
861 									trigdata->tg_relation, true);
862 
863 	/*
864 	 * Get the relation descriptors of the FK and PK tables and the new and
865 	 * old tuple.
866 	 *
867 	 * fk_rel is opened in RowExclusiveLock mode since that's what our
868 	 * eventual UPDATE will get on it.
869 	 */
870 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
871 	pk_rel = trigdata->tg_relation;
872 	newslot = trigdata->tg_newslot;
873 	oldslot = trigdata->tg_trigslot;
874 
875 	if (SPI_connect() != SPI_OK_CONNECT)
876 		elog(ERROR, "SPI_connect failed");
877 
878 	/* Fetch or prepare a saved plan for the cascaded update */
879 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_UPD_DOUPDATE);
880 
881 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
882 	{
883 		StringInfoData querybuf;
884 		StringInfoData qualbuf;
885 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
886 		char		attname[MAX_QUOTED_NAME_LEN];
887 		char		paramname[16];
888 		const char *querysep;
889 		const char *qualsep;
890 		Oid			queryoids[RI_MAX_NUMKEYS * 2];
891 		const char *fk_only;
892 
893 		/* ----------
894 		 * The query string built is
895 		 *	UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
896 		 *			WHERE $n = fkatt1 [AND ...]
897 		 * The type id's for the $ parameters are those of the
898 		 * corresponding PK attributes.  Note that we are assuming
899 		 * there is an assignment cast from the PK to the FK type;
900 		 * else the parser will fail.
901 		 * ----------
902 		 */
903 		initStringInfo(&querybuf);
904 		initStringInfo(&qualbuf);
905 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
906 			"" : "ONLY ";
907 		quoteRelationName(fkrelname, fk_rel);
908 		appendStringInfo(&querybuf, "UPDATE %s%s SET",
909 						 fk_only, fkrelname);
910 		querysep = "";
911 		qualsep = "WHERE";
912 		for (int i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
913 		{
914 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
915 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
916 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
917 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
918 
919 			quoteOneName(attname,
920 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
921 			appendStringInfo(&querybuf,
922 							 "%s %s = $%d",
923 							 querysep, attname, i + 1);
924 			sprintf(paramname, "$%d", j + 1);
925 			ri_GenerateQual(&qualbuf, qualsep,
926 							paramname, pk_type,
927 							riinfo->pf_eq_oprs[i],
928 							attname, fk_type);
929 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
930 				ri_GenerateQualCollation(&querybuf, pk_coll);
931 			querysep = ",";
932 			qualsep = "AND";
933 			queryoids[i] = pk_type;
934 			queryoids[j] = pk_type;
935 		}
936 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
937 
938 		/* Prepare and save the plan */
939 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys * 2, queryoids,
940 							 &qkey, fk_rel, pk_rel);
941 	}
942 
943 	/*
944 	 * We have a plan now. Run it to update the existing references.
945 	 */
946 	ri_PerformCheck(riinfo, &qkey, qplan,
947 					fk_rel, pk_rel,
948 					oldslot, newslot,
949 					true,		/* must detect new rows */
950 					SPI_OK_UPDATE);
951 
952 	if (SPI_finish() != SPI_OK_FINISH)
953 		elog(ERROR, "SPI_finish failed");
954 
955 	table_close(fk_rel, RowExclusiveLock);
956 
957 	return PointerGetDatum(NULL);
958 }
959 
960 
961 /*
962  * RI_FKey_setnull_del -
963  *
964  * Set foreign key references to NULL values at delete event on PK table.
965  */
966 Datum
RI_FKey_setnull_del(PG_FUNCTION_ARGS)967 RI_FKey_setnull_del(PG_FUNCTION_ARGS)
968 {
969 	/* Check that this is a valid trigger call on the right time and event. */
970 	ri_CheckTrigger(fcinfo, "RI_FKey_setnull_del", RI_TRIGTYPE_DELETE);
971 
972 	/* Share code with UPDATE case */
973 	return ri_set((TriggerData *) fcinfo->context, true);
974 }
975 
976 /*
977  * RI_FKey_setnull_upd -
978  *
979  * Set foreign key references to NULL at update event on PK table.
980  */
981 Datum
RI_FKey_setnull_upd(PG_FUNCTION_ARGS)982 RI_FKey_setnull_upd(PG_FUNCTION_ARGS)
983 {
984 	/* Check that this is a valid trigger call on the right time and event. */
985 	ri_CheckTrigger(fcinfo, "RI_FKey_setnull_upd", RI_TRIGTYPE_UPDATE);
986 
987 	/* Share code with DELETE case */
988 	return ri_set((TriggerData *) fcinfo->context, true);
989 }
990 
991 /*
992  * RI_FKey_setdefault_del -
993  *
994  * Set foreign key references to defaults at delete event on PK table.
995  */
996 Datum
RI_FKey_setdefault_del(PG_FUNCTION_ARGS)997 RI_FKey_setdefault_del(PG_FUNCTION_ARGS)
998 {
999 	/* Check that this is a valid trigger call on the right time and event. */
1000 	ri_CheckTrigger(fcinfo, "RI_FKey_setdefault_del", RI_TRIGTYPE_DELETE);
1001 
1002 	/* Share code with UPDATE case */
1003 	return ri_set((TriggerData *) fcinfo->context, false);
1004 }
1005 
1006 /*
1007  * RI_FKey_setdefault_upd -
1008  *
1009  * Set foreign key references to defaults at update event on PK table.
1010  */
1011 Datum
RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)1012 RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)
1013 {
1014 	/* Check that this is a valid trigger call on the right time and event. */
1015 	ri_CheckTrigger(fcinfo, "RI_FKey_setdefault_upd", RI_TRIGTYPE_UPDATE);
1016 
1017 	/* Share code with DELETE case */
1018 	return ri_set((TriggerData *) fcinfo->context, false);
1019 }
1020 
1021 /*
1022  * ri_set -
1023  *
1024  * Common code for ON DELETE SET NULL, ON DELETE SET DEFAULT, ON UPDATE SET
1025  * NULL, and ON UPDATE SET DEFAULT.
1026  */
1027 static Datum
ri_set(TriggerData * trigdata,bool is_set_null)1028 ri_set(TriggerData *trigdata, bool is_set_null)
1029 {
1030 	const RI_ConstraintInfo *riinfo;
1031 	Relation	fk_rel;
1032 	Relation	pk_rel;
1033 	TupleTableSlot *oldslot;
1034 	RI_QueryKey qkey;
1035 	SPIPlanPtr	qplan;
1036 
1037 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
1038 									trigdata->tg_relation, true);
1039 
1040 	/*
1041 	 * Get the relation descriptors of the FK and PK tables and the old tuple.
1042 	 *
1043 	 * fk_rel is opened in RowExclusiveLock mode since that's what our
1044 	 * eventual UPDATE will get on it.
1045 	 */
1046 	fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
1047 	pk_rel = trigdata->tg_relation;
1048 	oldslot = trigdata->tg_trigslot;
1049 
1050 	if (SPI_connect() != SPI_OK_CONNECT)
1051 		elog(ERROR, "SPI_connect failed");
1052 
1053 	/*
1054 	 * Fetch or prepare a saved plan for the set null/default operation (it's
1055 	 * the same query for delete and update cases)
1056 	 */
1057 	ri_BuildQueryKey(&qkey, riinfo,
1058 					 (is_set_null
1059 					  ? RI_PLAN_SETNULL_DOUPDATE
1060 					  : RI_PLAN_SETDEFAULT_DOUPDATE));
1061 
1062 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
1063 	{
1064 		StringInfoData querybuf;
1065 		StringInfoData qualbuf;
1066 		char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
1067 		char		attname[MAX_QUOTED_NAME_LEN];
1068 		char		paramname[16];
1069 		const char *querysep;
1070 		const char *qualsep;
1071 		Oid			queryoids[RI_MAX_NUMKEYS];
1072 		const char *fk_only;
1073 
1074 		/* ----------
1075 		 * The query string built is
1076 		 *	UPDATE [ONLY] <fktable> SET fkatt1 = {NULL|DEFAULT} [, ...]
1077 		 *			WHERE $1 = fkatt1 [AND ...]
1078 		 * The type id's for the $ parameters are those of the
1079 		 * corresponding PK attributes.
1080 		 * ----------
1081 		 */
1082 		initStringInfo(&querybuf);
1083 		initStringInfo(&qualbuf);
1084 		fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1085 			"" : "ONLY ";
1086 		quoteRelationName(fkrelname, fk_rel);
1087 		appendStringInfo(&querybuf, "UPDATE %s%s SET",
1088 						 fk_only, fkrelname);
1089 		querysep = "";
1090 		qualsep = "WHERE";
1091 		for (int i = 0; i < riinfo->nkeys; i++)
1092 		{
1093 			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1094 			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1095 			Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1096 			Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1097 
1098 			quoteOneName(attname,
1099 						 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1100 			appendStringInfo(&querybuf,
1101 							 "%s %s = %s",
1102 							 querysep, attname,
1103 							 is_set_null ? "NULL" : "DEFAULT");
1104 			sprintf(paramname, "$%d", i + 1);
1105 			ri_GenerateQual(&qualbuf, qualsep,
1106 							paramname, pk_type,
1107 							riinfo->pf_eq_oprs[i],
1108 							attname, fk_type);
1109 			if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
1110 				ri_GenerateQualCollation(&querybuf, pk_coll);
1111 			querysep = ",";
1112 			qualsep = "AND";
1113 			queryoids[i] = pk_type;
1114 		}
1115 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
1116 
1117 		/* Prepare and save the plan */
1118 		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
1119 							 &qkey, fk_rel, pk_rel);
1120 	}
1121 
1122 	/*
1123 	 * We have a plan now. Run it to update the existing references.
1124 	 */
1125 	ri_PerformCheck(riinfo, &qkey, qplan,
1126 					fk_rel, pk_rel,
1127 					oldslot, NULL,
1128 					true,		/* must detect new rows */
1129 					SPI_OK_UPDATE);
1130 
1131 	if (SPI_finish() != SPI_OK_FINISH)
1132 		elog(ERROR, "SPI_finish failed");
1133 
1134 	table_close(fk_rel, RowExclusiveLock);
1135 
1136 	if (is_set_null)
1137 		return PointerGetDatum(NULL);
1138 	else
1139 	{
1140 		/*
1141 		 * If we just deleted or updated the PK row whose key was equal to the
1142 		 * FK columns' default values, and a referencing row exists in the FK
1143 		 * table, we would have updated that row to the same values it already
1144 		 * had --- and RI_FKey_fk_upd_check_required would hence believe no
1145 		 * check is necessary.  So we need to do another lookup now and in
1146 		 * case a reference still exists, abort the operation.  That is
1147 		 * already implemented in the NO ACTION trigger, so just run it. (This
1148 		 * recheck is only needed in the SET DEFAULT case, since CASCADE would
1149 		 * remove such rows in case of a DELETE operation or would change the
1150 		 * FK key values in case of an UPDATE, while SET NULL is certain to
1151 		 * result in rows that satisfy the FK constraint.)
1152 		 */
1153 		return ri_restrict(trigdata, true);
1154 	}
1155 }
1156 
1157 
1158 /*
1159  * RI_FKey_pk_upd_check_required -
1160  *
1161  * Check if we really need to fire the RI trigger for an update or delete to a PK
1162  * relation.  This is called by the AFTER trigger queue manager to see if
1163  * it can skip queuing an instance of an RI trigger.  Returns true if the
1164  * trigger must be fired, false if we can prove the constraint will still
1165  * be satisfied.
1166  *
1167  * newslot will be NULL if this is called for a delete.
1168  */
1169 bool
RI_FKey_pk_upd_check_required(Trigger * trigger,Relation pk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot)1170 RI_FKey_pk_upd_check_required(Trigger *trigger, Relation pk_rel,
1171 							  TupleTableSlot *oldslot, TupleTableSlot *newslot)
1172 {
1173 	const RI_ConstraintInfo *riinfo;
1174 
1175 	riinfo = ri_FetchConstraintInfo(trigger, pk_rel, true);
1176 
1177 	/*
1178 	 * If any old key value is NULL, the row could not have been referenced by
1179 	 * an FK row, so no check is needed.
1180 	 */
1181 	if (ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) != RI_KEYS_NONE_NULL)
1182 		return false;
1183 
1184 	/* If all old and new key values are equal, no check is needed */
1185 	if (newslot && ri_KeysEqual(pk_rel, oldslot, newslot, riinfo, true))
1186 		return false;
1187 
1188 	/* Else we need to fire the trigger. */
1189 	return true;
1190 }
1191 
1192 /*
1193  * RI_FKey_fk_upd_check_required -
1194  *
1195  * Check if we really need to fire the RI trigger for an update to an FK
1196  * relation.  This is called by the AFTER trigger queue manager to see if
1197  * it can skip queuing an instance of an RI trigger.  Returns true if the
1198  * trigger must be fired, false if we can prove the constraint will still
1199  * be satisfied.
1200  */
1201 bool
RI_FKey_fk_upd_check_required(Trigger * trigger,Relation fk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot)1202 RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
1203 							  TupleTableSlot *oldslot, TupleTableSlot *newslot)
1204 {
1205 	const RI_ConstraintInfo *riinfo;
1206 	int			ri_nullcheck;
1207 	Datum		xminDatum;
1208 	TransactionId xmin;
1209 	bool		isnull;
1210 
1211 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1212 
1213 	ri_nullcheck = ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false);
1214 
1215 	/*
1216 	 * If all new key values are NULL, the row satisfies the constraint, so no
1217 	 * check is needed.
1218 	 */
1219 	if (ri_nullcheck == RI_KEYS_ALL_NULL)
1220 		return false;
1221 
1222 	/*
1223 	 * If some new key values are NULL, the behavior depends on the match
1224 	 * type.
1225 	 */
1226 	else if (ri_nullcheck == RI_KEYS_SOME_NULL)
1227 	{
1228 		switch (riinfo->confmatchtype)
1229 		{
1230 			case FKCONSTR_MATCH_SIMPLE:
1231 
1232 				/*
1233 				 * If any new key value is NULL, the row must satisfy the
1234 				 * constraint, so no check is needed.
1235 				 */
1236 				return false;
1237 
1238 			case FKCONSTR_MATCH_PARTIAL:
1239 
1240 				/*
1241 				 * Don't know, must run full check.
1242 				 */
1243 				break;
1244 
1245 			case FKCONSTR_MATCH_FULL:
1246 
1247 				/*
1248 				 * If some new key values are NULL, the row fails the
1249 				 * constraint.  We must not throw error here, because the row
1250 				 * might get invalidated before the constraint is to be
1251 				 * checked, but we should queue the event to apply the check
1252 				 * later.
1253 				 */
1254 				return true;
1255 		}
1256 	}
1257 
1258 	/*
1259 	 * Continues here for no new key values are NULL, or we couldn't decide
1260 	 * yet.
1261 	 */
1262 
1263 	/*
1264 	 * If the original row was inserted by our own transaction, we must fire
1265 	 * the trigger whether or not the keys are equal.  This is because our
1266 	 * UPDATE will invalidate the INSERT so that the INSERT RI trigger will
1267 	 * not do anything; so we had better do the UPDATE check.  (We could skip
1268 	 * this if we knew the INSERT trigger already fired, but there is no easy
1269 	 * way to know that.)
1270 	 */
1271 	xminDatum = slot_getsysattr(oldslot, MinTransactionIdAttributeNumber, &isnull);
1272 	Assert(!isnull);
1273 	xmin = DatumGetTransactionId(xminDatum);
1274 	if (TransactionIdIsCurrentTransactionId(xmin))
1275 		return true;
1276 
1277 	/* If all old and new key values are equal, no check is needed */
1278 	if (ri_KeysEqual(fk_rel, oldslot, newslot, riinfo, false))
1279 		return false;
1280 
1281 	/* Else we need to fire the trigger. */
1282 	return true;
1283 }
1284 
1285 /*
1286  * RI_Initial_Check -
1287  *
1288  * Check an entire table for non-matching values using a single query.
1289  * This is not a trigger procedure, but is called during ALTER TABLE
1290  * ADD FOREIGN KEY to validate the initial table contents.
1291  *
1292  * We expect that the caller has made provision to prevent any problems
1293  * caused by concurrent actions. This could be either by locking rel and
1294  * pkrel at ShareRowExclusiveLock or higher, or by otherwise ensuring
1295  * that triggers implementing the checks are already active.
1296  * Hence, we do not need to lock individual rows for the check.
1297  *
1298  * If the check fails because the current user doesn't have permissions
1299  * to read both tables, return false to let our caller know that they will
1300  * need to do something else to check the constraint.
1301  */
1302 bool
RI_Initial_Check(Trigger * trigger,Relation fk_rel,Relation pk_rel)1303 RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
1304 {
1305 	const RI_ConstraintInfo *riinfo;
1306 	StringInfoData querybuf;
1307 	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
1308 	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
1309 	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
1310 	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
1311 	RangeTblEntry *pkrte;
1312 	RangeTblEntry *fkrte;
1313 	const char *sep;
1314 	const char *fk_only;
1315 	const char *pk_only;
1316 	int			save_nestlevel;
1317 	char		workmembuf[32];
1318 	int			spi_result;
1319 	SPIPlanPtr	qplan;
1320 
1321 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1322 
1323 	/*
1324 	 * Check to make sure current user has enough permissions to do the test
1325 	 * query.  (If not, caller can fall back to the trigger method, which
1326 	 * works because it changes user IDs on the fly.)
1327 	 *
1328 	 * XXX are there any other show-stopper conditions to check?
1329 	 */
1330 	pkrte = makeNode(RangeTblEntry);
1331 	pkrte->rtekind = RTE_RELATION;
1332 	pkrte->relid = RelationGetRelid(pk_rel);
1333 	pkrte->relkind = pk_rel->rd_rel->relkind;
1334 	pkrte->rellockmode = AccessShareLock;
1335 	pkrte->requiredPerms = ACL_SELECT;
1336 
1337 	fkrte = makeNode(RangeTblEntry);
1338 	fkrte->rtekind = RTE_RELATION;
1339 	fkrte->relid = RelationGetRelid(fk_rel);
1340 	fkrte->relkind = fk_rel->rd_rel->relkind;
1341 	fkrte->rellockmode = AccessShareLock;
1342 	fkrte->requiredPerms = ACL_SELECT;
1343 
1344 	for (int i = 0; i < riinfo->nkeys; i++)
1345 	{
1346 		int			attno;
1347 
1348 		attno = riinfo->pk_attnums[i] - FirstLowInvalidHeapAttributeNumber;
1349 		pkrte->selectedCols = bms_add_member(pkrte->selectedCols, attno);
1350 
1351 		attno = riinfo->fk_attnums[i] - FirstLowInvalidHeapAttributeNumber;
1352 		fkrte->selectedCols = bms_add_member(fkrte->selectedCols, attno);
1353 	}
1354 
1355 	if (!ExecCheckRTPerms(list_make2(fkrte, pkrte), false))
1356 		return false;
1357 
1358 	/*
1359 	 * Also punt if RLS is enabled on either table unless this role has the
1360 	 * bypassrls right or is the table owner of the table(s) involved which
1361 	 * have RLS enabled.
1362 	 */
1363 	if (!has_bypassrls_privilege(GetUserId()) &&
1364 		((pk_rel->rd_rel->relrowsecurity &&
1365 		  !pg_class_ownercheck(pkrte->relid, GetUserId())) ||
1366 		 (fk_rel->rd_rel->relrowsecurity &&
1367 		  !pg_class_ownercheck(fkrte->relid, GetUserId()))))
1368 		return false;
1369 
1370 	/*----------
1371 	 * The query string built is:
1372 	 *	SELECT fk.keycols FROM [ONLY] relname fk
1373 	 *	 LEFT OUTER JOIN [ONLY] pkrelname pk
1374 	 *	 ON (pk.pkkeycol1=fk.keycol1 [AND ...])
1375 	 *	 WHERE pk.pkkeycol1 IS NULL AND
1376 	 * For MATCH SIMPLE:
1377 	 *	 (fk.keycol1 IS NOT NULL [AND ...])
1378 	 * For MATCH FULL:
1379 	 *	 (fk.keycol1 IS NOT NULL [OR ...])
1380 	 *
1381 	 * We attach COLLATE clauses to the operators when comparing columns
1382 	 * that have different collations.
1383 	 *----------
1384 	 */
1385 	initStringInfo(&querybuf);
1386 	appendStringInfoString(&querybuf, "SELECT ");
1387 	sep = "";
1388 	for (int i = 0; i < riinfo->nkeys; i++)
1389 	{
1390 		quoteOneName(fkattname,
1391 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1392 		appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
1393 		sep = ", ";
1394 	}
1395 
1396 	quoteRelationName(pkrelname, pk_rel);
1397 	quoteRelationName(fkrelname, fk_rel);
1398 	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1399 		"" : "ONLY ";
1400 	pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1401 		"" : "ONLY ";
1402 	appendStringInfo(&querybuf,
1403 					 " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
1404 					 fk_only, fkrelname, pk_only, pkrelname);
1405 
1406 	strcpy(pkattname, "pk.");
1407 	strcpy(fkattname, "fk.");
1408 	sep = "(";
1409 	for (int i = 0; i < riinfo->nkeys; i++)
1410 	{
1411 		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1412 		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1413 		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1414 		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1415 
1416 		quoteOneName(pkattname + 3,
1417 					 RIAttName(pk_rel, riinfo->pk_attnums[i]));
1418 		quoteOneName(fkattname + 3,
1419 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1420 		ri_GenerateQual(&querybuf, sep,
1421 						pkattname, pk_type,
1422 						riinfo->pf_eq_oprs[i],
1423 						fkattname, fk_type);
1424 		if (pk_coll != fk_coll)
1425 			ri_GenerateQualCollation(&querybuf, pk_coll);
1426 		sep = "AND";
1427 	}
1428 
1429 	/*
1430 	 * It's sufficient to test any one pk attribute for null to detect a join
1431 	 * failure.
1432 	 */
1433 	quoteOneName(pkattname, RIAttName(pk_rel, riinfo->pk_attnums[0]));
1434 	appendStringInfo(&querybuf, ") WHERE pk.%s IS NULL AND (", pkattname);
1435 
1436 	sep = "";
1437 	for (int i = 0; i < riinfo->nkeys; i++)
1438 	{
1439 		quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
1440 		appendStringInfo(&querybuf,
1441 						 "%sfk.%s IS NOT NULL",
1442 						 sep, fkattname);
1443 		switch (riinfo->confmatchtype)
1444 		{
1445 			case FKCONSTR_MATCH_SIMPLE:
1446 				sep = " AND ";
1447 				break;
1448 			case FKCONSTR_MATCH_FULL:
1449 				sep = " OR ";
1450 				break;
1451 		}
1452 	}
1453 	appendStringInfoChar(&querybuf, ')');
1454 
1455 	/*
1456 	 * Temporarily increase work_mem so that the check query can be executed
1457 	 * more efficiently.  It seems okay to do this because the query is simple
1458 	 * enough to not use a multiple of work_mem, and one typically would not
1459 	 * have many large foreign-key validations happening concurrently.  So
1460 	 * this seems to meet the criteria for being considered a "maintenance"
1461 	 * operation, and accordingly we use maintenance_work_mem.  However, we
1462 	 * must also set hash_mem_multiplier to 1, since it is surely not okay to
1463 	 * let that get applied to the maintenance_work_mem value.
1464 	 *
1465 	 * We use the equivalent of a function SET option to allow the setting to
1466 	 * persist for exactly the duration of the check query.  guc.c also takes
1467 	 * care of undoing the setting on error.
1468 	 */
1469 	save_nestlevel = NewGUCNestLevel();
1470 
1471 	snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
1472 	(void) set_config_option("work_mem", workmembuf,
1473 							 PGC_USERSET, PGC_S_SESSION,
1474 							 GUC_ACTION_SAVE, true, 0, false);
1475 	(void) set_config_option("hash_mem_multiplier", "1",
1476 							 PGC_USERSET, PGC_S_SESSION,
1477 							 GUC_ACTION_SAVE, true, 0, false);
1478 
1479 	if (SPI_connect() != SPI_OK_CONNECT)
1480 		elog(ERROR, "SPI_connect failed");
1481 
1482 	/*
1483 	 * Generate the plan.  We don't need to cache it, and there are no
1484 	 * arguments to the plan.
1485 	 */
1486 	qplan = SPI_prepare(querybuf.data, 0, NULL);
1487 
1488 	if (qplan == NULL)
1489 		elog(ERROR, "SPI_prepare returned %s for %s",
1490 			 SPI_result_code_string(SPI_result), querybuf.data);
1491 
1492 	/*
1493 	 * Run the plan.  For safety we force a current snapshot to be used. (In
1494 	 * transaction-snapshot mode, this arguably violates transaction isolation
1495 	 * rules, but we really haven't got much choice.) We don't need to
1496 	 * register the snapshot, because SPI_execute_snapshot will see to it. We
1497 	 * need at most one tuple returned, so pass limit = 1.
1498 	 */
1499 	spi_result = SPI_execute_snapshot(qplan,
1500 									  NULL, NULL,
1501 									  GetLatestSnapshot(),
1502 									  InvalidSnapshot,
1503 									  true, false, 1);
1504 
1505 	/* Check result */
1506 	if (spi_result != SPI_OK_SELECT)
1507 		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
1508 
1509 	/* Did we find a tuple violating the constraint? */
1510 	if (SPI_processed > 0)
1511 	{
1512 		TupleTableSlot *slot;
1513 		HeapTuple	tuple = SPI_tuptable->vals[0];
1514 		TupleDesc	tupdesc = SPI_tuptable->tupdesc;
1515 		RI_ConstraintInfo fake_riinfo;
1516 
1517 		slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
1518 
1519 		heap_deform_tuple(tuple, tupdesc,
1520 						  slot->tts_values, slot->tts_isnull);
1521 		ExecStoreVirtualTuple(slot);
1522 
1523 		/*
1524 		 * The columns to look at in the result tuple are 1..N, not whatever
1525 		 * they are in the fk_rel.  Hack up riinfo so that the subroutines
1526 		 * called here will behave properly.
1527 		 *
1528 		 * In addition to this, we have to pass the correct tupdesc to
1529 		 * ri_ReportViolation, overriding its normal habit of using the pk_rel
1530 		 * or fk_rel's tupdesc.
1531 		 */
1532 		memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
1533 		for (int i = 0; i < fake_riinfo.nkeys; i++)
1534 			fake_riinfo.fk_attnums[i] = i + 1;
1535 
1536 		/*
1537 		 * If it's MATCH FULL, and there are any nulls in the FK keys,
1538 		 * complain about that rather than the lack of a match.  MATCH FULL
1539 		 * disallows partially-null FK rows.
1540 		 */
1541 		if (fake_riinfo.confmatchtype == FKCONSTR_MATCH_FULL &&
1542 			ri_NullCheck(tupdesc, slot, &fake_riinfo, false) != RI_KEYS_NONE_NULL)
1543 			ereport(ERROR,
1544 					(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
1545 					 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
1546 							RelationGetRelationName(fk_rel),
1547 							NameStr(fake_riinfo.conname)),
1548 					 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
1549 					 errtableconstraint(fk_rel,
1550 										NameStr(fake_riinfo.conname))));
1551 
1552 		/*
1553 		 * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
1554 		 * query, which isn't true, but will cause it to use
1555 		 * fake_riinfo.fk_attnums as we need.
1556 		 */
1557 		ri_ReportViolation(&fake_riinfo,
1558 						   pk_rel, fk_rel,
1559 						   slot, tupdesc,
1560 						   RI_PLAN_CHECK_LOOKUPPK, false);
1561 
1562 		ExecDropSingleTupleTableSlot(slot);
1563 	}
1564 
1565 	if (SPI_finish() != SPI_OK_FINISH)
1566 		elog(ERROR, "SPI_finish failed");
1567 
1568 	/*
1569 	 * Restore work_mem and hash_mem_multiplier.
1570 	 */
1571 	AtEOXact_GUC(true, save_nestlevel);
1572 
1573 	return true;
1574 }
1575 
1576 /*
1577  * RI_PartitionRemove_Check -
1578  *
1579  * Verify no referencing values exist, when a partition is detached on
1580  * the referenced side of a foreign key constraint.
1581  */
1582 void
RI_PartitionRemove_Check(Trigger * trigger,Relation fk_rel,Relation pk_rel)1583 RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
1584 {
1585 	const RI_ConstraintInfo *riinfo;
1586 	StringInfoData querybuf;
1587 	char	   *constraintDef;
1588 	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
1589 	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
1590 	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
1591 	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
1592 	const char *sep;
1593 	const char *fk_only;
1594 	int			save_nestlevel;
1595 	char		workmembuf[32];
1596 	int			spi_result;
1597 	SPIPlanPtr	qplan;
1598 	int			i;
1599 
1600 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1601 
1602 	/*
1603 	 * We don't check permissions before displaying the error message, on the
1604 	 * assumption that the user detaching the partition must have enough
1605 	 * privileges to examine the table contents anyhow.
1606 	 */
1607 
1608 	/*----------
1609 	 * The query string built is:
1610 	 *  SELECT fk.keycols FROM [ONLY] relname fk
1611 	 *    JOIN pkrelname pk
1612 	 *    ON (pk.pkkeycol1=fk.keycol1 [AND ...])
1613 	 *    WHERE (<partition constraint>) AND
1614 	 * For MATCH SIMPLE:
1615 	 *   (fk.keycol1 IS NOT NULL [AND ...])
1616 	 * For MATCH FULL:
1617 	 *   (fk.keycol1 IS NOT NULL [OR ...])
1618 	 *
1619 	 * We attach COLLATE clauses to the operators when comparing columns
1620 	 * that have different collations.
1621 	 *----------
1622 	 */
1623 	initStringInfo(&querybuf);
1624 	appendStringInfoString(&querybuf, "SELECT ");
1625 	sep = "";
1626 	for (i = 0; i < riinfo->nkeys; i++)
1627 	{
1628 		quoteOneName(fkattname,
1629 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1630 		appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
1631 		sep = ", ";
1632 	}
1633 
1634 	quoteRelationName(pkrelname, pk_rel);
1635 	quoteRelationName(fkrelname, fk_rel);
1636 	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1637 		"" : "ONLY ";
1638 	appendStringInfo(&querybuf,
1639 					 " FROM %s%s fk JOIN %s pk ON",
1640 					 fk_only, fkrelname, pkrelname);
1641 	strcpy(pkattname, "pk.");
1642 	strcpy(fkattname, "fk.");
1643 	sep = "(";
1644 	for (i = 0; i < riinfo->nkeys; i++)
1645 	{
1646 		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1647 		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1648 		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1649 		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1650 
1651 		quoteOneName(pkattname + 3,
1652 					 RIAttName(pk_rel, riinfo->pk_attnums[i]));
1653 		quoteOneName(fkattname + 3,
1654 					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1655 		ri_GenerateQual(&querybuf, sep,
1656 						pkattname, pk_type,
1657 						riinfo->pf_eq_oprs[i],
1658 						fkattname, fk_type);
1659 		if (pk_coll != fk_coll)
1660 			ri_GenerateQualCollation(&querybuf, pk_coll);
1661 		sep = "AND";
1662 	}
1663 
1664 	/*
1665 	 * Start the WHERE clause with the partition constraint (except if this is
1666 	 * the default partition and there's no other partition, because the
1667 	 * partition constraint is the empty string in that case.)
1668 	 */
1669 	constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
1670 	if (constraintDef && constraintDef[0] != '\0')
1671 		appendStringInfo(&querybuf, ") WHERE %s AND (",
1672 						 constraintDef);
1673 	else
1674 		appendStringInfoString(&querybuf, ") WHERE (");
1675 
1676 	sep = "";
1677 	for (i = 0; i < riinfo->nkeys; i++)
1678 	{
1679 		quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
1680 		appendStringInfo(&querybuf,
1681 						 "%sfk.%s IS NOT NULL",
1682 						 sep, fkattname);
1683 		switch (riinfo->confmatchtype)
1684 		{
1685 			case FKCONSTR_MATCH_SIMPLE:
1686 				sep = " AND ";
1687 				break;
1688 			case FKCONSTR_MATCH_FULL:
1689 				sep = " OR ";
1690 				break;
1691 		}
1692 	}
1693 	appendStringInfoChar(&querybuf, ')');
1694 
1695 	/*
1696 	 * Temporarily increase work_mem so that the check query can be executed
1697 	 * more efficiently.  It seems okay to do this because the query is simple
1698 	 * enough to not use a multiple of work_mem, and one typically would not
1699 	 * have many large foreign-key validations happening concurrently.  So
1700 	 * this seems to meet the criteria for being considered a "maintenance"
1701 	 * operation, and accordingly we use maintenance_work_mem.  However, we
1702 	 * must also set hash_mem_multiplier to 1, since it is surely not okay to
1703 	 * let that get applied to the maintenance_work_mem value.
1704 	 *
1705 	 * We use the equivalent of a function SET option to allow the setting to
1706 	 * persist for exactly the duration of the check query.  guc.c also takes
1707 	 * care of undoing the setting on error.
1708 	 */
1709 	save_nestlevel = NewGUCNestLevel();
1710 
1711 	snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
1712 	(void) set_config_option("work_mem", workmembuf,
1713 							 PGC_USERSET, PGC_S_SESSION,
1714 							 GUC_ACTION_SAVE, true, 0, false);
1715 	(void) set_config_option("hash_mem_multiplier", "1",
1716 							 PGC_USERSET, PGC_S_SESSION,
1717 							 GUC_ACTION_SAVE, true, 0, false);
1718 
1719 	if (SPI_connect() != SPI_OK_CONNECT)
1720 		elog(ERROR, "SPI_connect failed");
1721 
1722 	/*
1723 	 * Generate the plan.  We don't need to cache it, and there are no
1724 	 * arguments to the plan.
1725 	 */
1726 	qplan = SPI_prepare(querybuf.data, 0, NULL);
1727 
1728 	if (qplan == NULL)
1729 		elog(ERROR, "SPI_prepare returned %s for %s",
1730 			 SPI_result_code_string(SPI_result), querybuf.data);
1731 
1732 	/*
1733 	 * Run the plan.  For safety we force a current snapshot to be used. (In
1734 	 * transaction-snapshot mode, this arguably violates transaction isolation
1735 	 * rules, but we really haven't got much choice.) We don't need to
1736 	 * register the snapshot, because SPI_execute_snapshot will see to it. We
1737 	 * need at most one tuple returned, so pass limit = 1.
1738 	 */
1739 	spi_result = SPI_execute_snapshot(qplan,
1740 									  NULL, NULL,
1741 									  GetLatestSnapshot(),
1742 									  InvalidSnapshot,
1743 									  true, false, 1);
1744 
1745 	/* Check result */
1746 	if (spi_result != SPI_OK_SELECT)
1747 		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
1748 
1749 	/* Did we find a tuple that would violate the constraint? */
1750 	if (SPI_processed > 0)
1751 	{
1752 		TupleTableSlot *slot;
1753 		HeapTuple	tuple = SPI_tuptable->vals[0];
1754 		TupleDesc	tupdesc = SPI_tuptable->tupdesc;
1755 		RI_ConstraintInfo fake_riinfo;
1756 
1757 		slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
1758 
1759 		heap_deform_tuple(tuple, tupdesc,
1760 						  slot->tts_values, slot->tts_isnull);
1761 		ExecStoreVirtualTuple(slot);
1762 
1763 		/*
1764 		 * The columns to look at in the result tuple are 1..N, not whatever
1765 		 * they are in the fk_rel.  Hack up riinfo so that ri_ReportViolation
1766 		 * will behave properly.
1767 		 *
1768 		 * In addition to this, we have to pass the correct tupdesc to
1769 		 * ri_ReportViolation, overriding its normal habit of using the pk_rel
1770 		 * or fk_rel's tupdesc.
1771 		 */
1772 		memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
1773 		for (i = 0; i < fake_riinfo.nkeys; i++)
1774 			fake_riinfo.pk_attnums[i] = i + 1;
1775 
1776 		ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
1777 						   slot, tupdesc, 0, true);
1778 	}
1779 
1780 	if (SPI_finish() != SPI_OK_FINISH)
1781 		elog(ERROR, "SPI_finish failed");
1782 
1783 	/*
1784 	 * Restore work_mem and hash_mem_multiplier.
1785 	 */
1786 	AtEOXact_GUC(true, save_nestlevel);
1787 }
1788 
1789 
1790 /* ----------
1791  * Local functions below
1792  * ----------
1793  */
1794 
1795 
1796 /*
1797  * quoteOneName --- safely quote a single SQL name
1798  *
1799  * buffer must be MAX_QUOTED_NAME_LEN long (includes room for \0)
1800  */
1801 static void
quoteOneName(char * buffer,const char * name)1802 quoteOneName(char *buffer, const char *name)
1803 {
1804 	/* Rather than trying to be smart, just always quote it. */
1805 	*buffer++ = '"';
1806 	while (*name)
1807 	{
1808 		if (*name == '"')
1809 			*buffer++ = '"';
1810 		*buffer++ = *name++;
1811 	}
1812 	*buffer++ = '"';
1813 	*buffer = '\0';
1814 }
1815 
1816 /*
1817  * quoteRelationName --- safely quote a fully qualified relation name
1818  *
1819  * buffer must be MAX_QUOTED_REL_NAME_LEN long (includes room for \0)
1820  */
1821 static void
quoteRelationName(char * buffer,Relation rel)1822 quoteRelationName(char *buffer, Relation rel)
1823 {
1824 	quoteOneName(buffer, get_namespace_name(RelationGetNamespace(rel)));
1825 	buffer += strlen(buffer);
1826 	*buffer++ = '.';
1827 	quoteOneName(buffer, RelationGetRelationName(rel));
1828 }
1829 
1830 /*
1831  * ri_GenerateQual --- generate a WHERE clause equating two variables
1832  *
1833  * This basically appends " sep leftop op rightop" to buf, adding casts
1834  * and schema qualification as needed to ensure that the parser will select
1835  * the operator we specify.  leftop and rightop should be parenthesized
1836  * if they aren't variables or parameters.
1837  */
1838 static void
ri_GenerateQual(StringInfo buf,const char * sep,const char * leftop,Oid leftoptype,Oid opoid,const char * rightop,Oid rightoptype)1839 ri_GenerateQual(StringInfo buf,
1840 				const char *sep,
1841 				const char *leftop, Oid leftoptype,
1842 				Oid opoid,
1843 				const char *rightop, Oid rightoptype)
1844 {
1845 	appendStringInfo(buf, " %s ", sep);
1846 	generate_operator_clause(buf, leftop, leftoptype, opoid,
1847 							 rightop, rightoptype);
1848 }
1849 
1850 /*
1851  * ri_GenerateQualCollation --- add a COLLATE spec to a WHERE clause
1852  *
1853  * At present, we intentionally do not use this function for RI queries that
1854  * compare a variable to a $n parameter.  Since parameter symbols always have
1855  * default collation, the effect will be to use the variable's collation.
1856  * Now that is only strictly correct when testing the referenced column, since
1857  * the SQL standard specifies that RI comparisons should use the referenced
1858  * column's collation.  However, so long as all collations have the same
1859  * notion of equality (which they do, because texteq reduces to bitwise
1860  * equality), there's no visible semantic impact from using the referencing
1861  * column's collation when testing it, and this is a good thing to do because
1862  * it lets us use a normal index on the referencing column.  However, we do
1863  * have to use this function when directly comparing the referencing and
1864  * referenced columns, if they are of different collations; else the parser
1865  * will fail to resolve the collation to use.
1866  */
1867 static void
ri_GenerateQualCollation(StringInfo buf,Oid collation)1868 ri_GenerateQualCollation(StringInfo buf, Oid collation)
1869 {
1870 	HeapTuple	tp;
1871 	Form_pg_collation colltup;
1872 	char	   *collname;
1873 	char		onename[MAX_QUOTED_NAME_LEN];
1874 
1875 	/* Nothing to do if it's a noncollatable data type */
1876 	if (!OidIsValid(collation))
1877 		return;
1878 
1879 	tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(collation));
1880 	if (!HeapTupleIsValid(tp))
1881 		elog(ERROR, "cache lookup failed for collation %u", collation);
1882 	colltup = (Form_pg_collation) GETSTRUCT(tp);
1883 	collname = NameStr(colltup->collname);
1884 
1885 	/*
1886 	 * We qualify the name always, for simplicity and to ensure the query is
1887 	 * not search-path-dependent.
1888 	 */
1889 	quoteOneName(onename, get_namespace_name(colltup->collnamespace));
1890 	appendStringInfo(buf, " COLLATE %s", onename);
1891 	quoteOneName(onename, collname);
1892 	appendStringInfo(buf, ".%s", onename);
1893 
1894 	ReleaseSysCache(tp);
1895 }
1896 
1897 /* ----------
1898  * ri_BuildQueryKey -
1899  *
1900  *	Construct a hashtable key for a prepared SPI plan of an FK constraint.
1901  *
1902  *		key: output argument, *key is filled in based on the other arguments
1903  *		riinfo: info derived from pg_constraint entry
1904  *		constr_queryno: an internal number identifying the query type
1905  *			(see RI_PLAN_XXX constants at head of file)
1906  * ----------
1907  */
1908 static void
ri_BuildQueryKey(RI_QueryKey * key,const RI_ConstraintInfo * riinfo,int32 constr_queryno)1909 ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
1910 				 int32 constr_queryno)
1911 {
1912 	/*
1913 	 * Inherited constraints with a common ancestor can share ri_query_cache
1914 	 * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
1915 	 * Except in that case, the query processes the other table involved in
1916 	 * the FK constraint (i.e., not the table on which the trigger has been
1917 	 * fired), and so it will be the same for all members of the inheritance
1918 	 * tree.  So we may use the root constraint's OID in the hash key, rather
1919 	 * than the constraint's own OID.  This avoids creating duplicate SPI
1920 	 * plans, saving lots of work and memory when there are many partitions
1921 	 * with similar FK constraints.
1922 	 *
1923 	 * (Note that we must still have a separate RI_ConstraintInfo for each
1924 	 * constraint, because partitions can have different column orders,
1925 	 * resulting in different pk_attnums[] or fk_attnums[] array contents.)
1926 	 *
1927 	 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
1928 	 * to use memset to clear them.
1929 	 */
1930 	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
1931 		key->constr_id = riinfo->constraint_root_id;
1932 	else
1933 		key->constr_id = riinfo->constraint_id;
1934 	key->constr_queryno = constr_queryno;
1935 }
1936 
1937 /*
1938  * Check that RI trigger function was called in expected context
1939  */
1940 static void
ri_CheckTrigger(FunctionCallInfo fcinfo,const char * funcname,int tgkind)1941 ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname, int tgkind)
1942 {
1943 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
1944 
1945 	if (!CALLED_AS_TRIGGER(fcinfo))
1946 		ereport(ERROR,
1947 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1948 				 errmsg("function \"%s\" was not called by trigger manager", funcname)));
1949 
1950 	/*
1951 	 * Check proper event
1952 	 */
1953 	if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) ||
1954 		!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
1955 		ereport(ERROR,
1956 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1957 				 errmsg("function \"%s\" must be fired AFTER ROW", funcname)));
1958 
1959 	switch (tgkind)
1960 	{
1961 		case RI_TRIGTYPE_INSERT:
1962 			if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
1963 				ereport(ERROR,
1964 						(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1965 						 errmsg("function \"%s\" must be fired for INSERT", funcname)));
1966 			break;
1967 		case RI_TRIGTYPE_UPDATE:
1968 			if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
1969 				ereport(ERROR,
1970 						(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1971 						 errmsg("function \"%s\" must be fired for UPDATE", funcname)));
1972 			break;
1973 		case RI_TRIGTYPE_DELETE:
1974 			if (!TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
1975 				ereport(ERROR,
1976 						(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1977 						 errmsg("function \"%s\" must be fired for DELETE", funcname)));
1978 			break;
1979 	}
1980 }
1981 
1982 
1983 /*
1984  * Fetch the RI_ConstraintInfo struct for the trigger's FK constraint.
1985  */
1986 static const RI_ConstraintInfo *
ri_FetchConstraintInfo(Trigger * trigger,Relation trig_rel,bool rel_is_pk)1987 ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
1988 {
1989 	Oid			constraintOid = trigger->tgconstraint;
1990 	const RI_ConstraintInfo *riinfo;
1991 
1992 	/*
1993 	 * Check that the FK constraint's OID is available; it might not be if
1994 	 * we've been invoked via an ordinary trigger or an old-style "constraint
1995 	 * trigger".
1996 	 */
1997 	if (!OidIsValid(constraintOid))
1998 		ereport(ERROR,
1999 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2000 				 errmsg("no pg_constraint entry for trigger \"%s\" on table \"%s\"",
2001 						trigger->tgname, RelationGetRelationName(trig_rel)),
2002 				 errhint("Remove this referential integrity trigger and its mates, then do ALTER TABLE ADD CONSTRAINT.")));
2003 
2004 	/* Find or create a hashtable entry for the constraint */
2005 	riinfo = ri_LoadConstraintInfo(constraintOid);
2006 
2007 	/* Do some easy cross-checks against the trigger call data */
2008 	if (rel_is_pk)
2009 	{
2010 		if (riinfo->fk_relid != trigger->tgconstrrelid ||
2011 			riinfo->pk_relid != RelationGetRelid(trig_rel))
2012 			elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
2013 				 trigger->tgname, RelationGetRelationName(trig_rel));
2014 	}
2015 	else
2016 	{
2017 		if (riinfo->fk_relid != RelationGetRelid(trig_rel) ||
2018 			riinfo->pk_relid != trigger->tgconstrrelid)
2019 			elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
2020 				 trigger->tgname, RelationGetRelationName(trig_rel));
2021 	}
2022 
2023 	if (riinfo->confmatchtype != FKCONSTR_MATCH_FULL &&
2024 		riinfo->confmatchtype != FKCONSTR_MATCH_PARTIAL &&
2025 		riinfo->confmatchtype != FKCONSTR_MATCH_SIMPLE)
2026 		elog(ERROR, "unrecognized confmatchtype: %d",
2027 			 riinfo->confmatchtype);
2028 
2029 	if (riinfo->confmatchtype == FKCONSTR_MATCH_PARTIAL)
2030 		ereport(ERROR,
2031 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2032 				 errmsg("MATCH PARTIAL not yet implemented")));
2033 
2034 	return riinfo;
2035 }
2036 
2037 /*
2038  * Fetch or create the RI_ConstraintInfo struct for an FK constraint.
2039  */
2040 static const RI_ConstraintInfo *
ri_LoadConstraintInfo(Oid constraintOid)2041 ri_LoadConstraintInfo(Oid constraintOid)
2042 {
2043 	RI_ConstraintInfo *riinfo;
2044 	bool		found;
2045 	HeapTuple	tup;
2046 	Form_pg_constraint conForm;
2047 
2048 	/*
2049 	 * On the first call initialize the hashtable
2050 	 */
2051 	if (!ri_constraint_cache)
2052 		ri_InitHashTables();
2053 
2054 	/*
2055 	 * Find or create a hash entry.  If we find a valid one, just return it.
2056 	 */
2057 	riinfo = (RI_ConstraintInfo *) hash_search(ri_constraint_cache,
2058 											   (void *) &constraintOid,
2059 											   HASH_ENTER, &found);
2060 	if (!found)
2061 		riinfo->valid = false;
2062 	else if (riinfo->valid)
2063 		return riinfo;
2064 
2065 	/*
2066 	 * Fetch the pg_constraint row so we can fill in the entry.
2067 	 */
2068 	tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintOid));
2069 	if (!HeapTupleIsValid(tup)) /* should not happen */
2070 		elog(ERROR, "cache lookup failed for constraint %u", constraintOid);
2071 	conForm = (Form_pg_constraint) GETSTRUCT(tup);
2072 
2073 	if (conForm->contype != CONSTRAINT_FOREIGN) /* should not happen */
2074 		elog(ERROR, "constraint %u is not a foreign key constraint",
2075 			 constraintOid);
2076 
2077 	/* And extract data */
2078 	Assert(riinfo->constraint_id == constraintOid);
2079 	if (OidIsValid(conForm->conparentid))
2080 		riinfo->constraint_root_id =
2081 			get_ri_constraint_root(conForm->conparentid);
2082 	else
2083 		riinfo->constraint_root_id = constraintOid;
2084 	riinfo->oidHashValue = GetSysCacheHashValue1(CONSTROID,
2085 												 ObjectIdGetDatum(constraintOid));
2086 	riinfo->rootHashValue = GetSysCacheHashValue1(CONSTROID,
2087 												  ObjectIdGetDatum(riinfo->constraint_root_id));
2088 	memcpy(&riinfo->conname, &conForm->conname, sizeof(NameData));
2089 	riinfo->pk_relid = conForm->confrelid;
2090 	riinfo->fk_relid = conForm->conrelid;
2091 	riinfo->confupdtype = conForm->confupdtype;
2092 	riinfo->confdeltype = conForm->confdeltype;
2093 	riinfo->confmatchtype = conForm->confmatchtype;
2094 
2095 	DeconstructFkConstraintRow(tup,
2096 							   &riinfo->nkeys,
2097 							   riinfo->fk_attnums,
2098 							   riinfo->pk_attnums,
2099 							   riinfo->pf_eq_oprs,
2100 							   riinfo->pp_eq_oprs,
2101 							   riinfo->ff_eq_oprs);
2102 
2103 	ReleaseSysCache(tup);
2104 
2105 	/*
2106 	 * For efficient processing of invalidation messages below, we keep a
2107 	 * doubly-linked list, and a count, of all currently valid entries.
2108 	 */
2109 	dlist_push_tail(&ri_constraint_cache_valid_list, &riinfo->valid_link);
2110 	ri_constraint_cache_valid_count++;
2111 
2112 	riinfo->valid = true;
2113 
2114 	return riinfo;
2115 }
2116 
2117 /*
2118  * get_ri_constraint_root
2119  *		Returns the OID of the constraint's root parent
2120  */
2121 static Oid
get_ri_constraint_root(Oid constrOid)2122 get_ri_constraint_root(Oid constrOid)
2123 {
2124 	for (;;)
2125 	{
2126 		HeapTuple	tuple;
2127 		Oid			constrParentOid;
2128 
2129 		tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
2130 		if (!HeapTupleIsValid(tuple))
2131 			elog(ERROR, "cache lookup failed for constraint %u", constrOid);
2132 		constrParentOid = ((Form_pg_constraint) GETSTRUCT(tuple))->conparentid;
2133 		ReleaseSysCache(tuple);
2134 		if (!OidIsValid(constrParentOid))
2135 			break;				/* we reached the root constraint */
2136 		constrOid = constrParentOid;
2137 	}
2138 	return constrOid;
2139 }
2140 
2141 /*
2142  * Callback for pg_constraint inval events
2143  *
2144  * While most syscache callbacks just flush all their entries, pg_constraint
2145  * gets enough update traffic that it's probably worth being smarter.
2146  * Invalidate any ri_constraint_cache entry associated with the syscache
2147  * entry with the specified hash value, or all entries if hashvalue == 0.
2148  *
2149  * Note: at the time a cache invalidation message is processed there may be
2150  * active references to the cache.  Because of this we never remove entries
2151  * from the cache, but only mark them invalid, which is harmless to active
2152  * uses.  (Any query using an entry should hold a lock sufficient to keep that
2153  * data from changing under it --- but we may get cache flushes anyway.)
2154  */
2155 static void
InvalidateConstraintCacheCallBack(Datum arg,int cacheid,uint32 hashvalue)2156 InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
2157 {
2158 	dlist_mutable_iter iter;
2159 
2160 	Assert(ri_constraint_cache != NULL);
2161 
2162 	/*
2163 	 * If the list of currently valid entries gets excessively large, we mark
2164 	 * them all invalid so we can empty the list.  This arrangement avoids
2165 	 * O(N^2) behavior in situations where a session touches many foreign keys
2166 	 * and also does many ALTER TABLEs, such as a restore from pg_dump.
2167 	 */
2168 	if (ri_constraint_cache_valid_count > 1000)
2169 		hashvalue = 0;			/* pretend it's a cache reset */
2170 
2171 	dlist_foreach_modify(iter, &ri_constraint_cache_valid_list)
2172 	{
2173 		RI_ConstraintInfo *riinfo = dlist_container(RI_ConstraintInfo,
2174 													valid_link, iter.cur);
2175 
2176 		/*
2177 		 * We must invalidate not only entries directly matching the given
2178 		 * hash value, but also child entries, in case the invalidation
2179 		 * affects a root constraint.
2180 		 */
2181 		if (hashvalue == 0 ||
2182 			riinfo->oidHashValue == hashvalue ||
2183 			riinfo->rootHashValue == hashvalue)
2184 		{
2185 			riinfo->valid = false;
2186 			/* Remove invalidated entries from the list, too */
2187 			dlist_delete(iter.cur);
2188 			ri_constraint_cache_valid_count--;
2189 		}
2190 	}
2191 }
2192 
2193 
2194 /*
2195  * Prepare execution plan for a query to enforce an RI restriction
2196  */
2197 static SPIPlanPtr
ri_PlanCheck(const char * querystr,int nargs,Oid * argtypes,RI_QueryKey * qkey,Relation fk_rel,Relation pk_rel)2198 ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
2199 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
2200 {
2201 	SPIPlanPtr	qplan;
2202 	Relation	query_rel;
2203 	Oid			save_userid;
2204 	int			save_sec_context;
2205 
2206 	/*
2207 	 * Use the query type code to determine whether the query is run against
2208 	 * the PK or FK table; we'll do the check as that table's owner
2209 	 */
2210 	if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
2211 		query_rel = pk_rel;
2212 	else
2213 		query_rel = fk_rel;
2214 
2215 	/* Switch to proper UID to perform check as */
2216 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
2217 	SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
2218 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
2219 						   SECURITY_NOFORCE_RLS);
2220 
2221 	/* Create the plan */
2222 	qplan = SPI_prepare(querystr, nargs, argtypes);
2223 
2224 	if (qplan == NULL)
2225 		elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), querystr);
2226 
2227 	/* Restore UID and security context */
2228 	SetUserIdAndSecContext(save_userid, save_sec_context);
2229 
2230 	/* Save the plan */
2231 	SPI_keepplan(qplan);
2232 	ri_HashPreparedPlan(qkey, qplan);
2233 
2234 	return qplan;
2235 }
2236 
2237 /*
2238  * Perform a query to enforce an RI restriction
2239  */
2240 static bool
ri_PerformCheck(const RI_ConstraintInfo * riinfo,RI_QueryKey * qkey,SPIPlanPtr qplan,Relation fk_rel,Relation pk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot,bool detectNewRows,int expect_OK)2241 ri_PerformCheck(const RI_ConstraintInfo *riinfo,
2242 				RI_QueryKey *qkey, SPIPlanPtr qplan,
2243 				Relation fk_rel, Relation pk_rel,
2244 				TupleTableSlot *oldslot, TupleTableSlot *newslot,
2245 				bool detectNewRows, int expect_OK)
2246 {
2247 	Relation	query_rel,
2248 				source_rel;
2249 	bool		source_is_pk;
2250 	Snapshot	test_snapshot;
2251 	Snapshot	crosscheck_snapshot;
2252 	int			limit;
2253 	int			spi_result;
2254 	Oid			save_userid;
2255 	int			save_sec_context;
2256 	Datum		vals[RI_MAX_NUMKEYS * 2];
2257 	char		nulls[RI_MAX_NUMKEYS * 2];
2258 
2259 	/*
2260 	 * Use the query type code to determine whether the query is run against
2261 	 * the PK or FK table; we'll do the check as that table's owner
2262 	 */
2263 	if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
2264 		query_rel = pk_rel;
2265 	else
2266 		query_rel = fk_rel;
2267 
2268 	/*
2269 	 * The values for the query are taken from the table on which the trigger
2270 	 * is called - it is normally the other one with respect to query_rel. An
2271 	 * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
2272 	 * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK).  We might eventually
2273 	 * need some less klugy way to determine this.
2274 	 */
2275 	if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
2276 	{
2277 		source_rel = fk_rel;
2278 		source_is_pk = false;
2279 	}
2280 	else
2281 	{
2282 		source_rel = pk_rel;
2283 		source_is_pk = true;
2284 	}
2285 
2286 	/* Extract the parameters to be passed into the query */
2287 	if (newslot)
2288 	{
2289 		ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
2290 						 vals, nulls);
2291 		if (oldslot)
2292 			ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
2293 							 vals + riinfo->nkeys, nulls + riinfo->nkeys);
2294 	}
2295 	else
2296 	{
2297 		ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
2298 						 vals, nulls);
2299 	}
2300 
2301 	/*
2302 	 * In READ COMMITTED mode, we just need to use an up-to-date regular
2303 	 * snapshot, and we will see all rows that could be interesting. But in
2304 	 * transaction-snapshot mode, we can't change the transaction snapshot. If
2305 	 * the caller passes detectNewRows == false then it's okay to do the query
2306 	 * with the transaction snapshot; otherwise we use a current snapshot, and
2307 	 * tell the executor to error out if it finds any rows under the current
2308 	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
2309 	 * that SPI_execute_snapshot will register the snapshots, so we don't need
2310 	 * to bother here.
2311 	 */
2312 	if (IsolationUsesXactSnapshot() && detectNewRows)
2313 	{
2314 		CommandCounterIncrement();	/* be sure all my own work is visible */
2315 		test_snapshot = GetLatestSnapshot();
2316 		crosscheck_snapshot = GetTransactionSnapshot();
2317 	}
2318 	else
2319 	{
2320 		/* the default SPI behavior is okay */
2321 		test_snapshot = InvalidSnapshot;
2322 		crosscheck_snapshot = InvalidSnapshot;
2323 	}
2324 
2325 	/*
2326 	 * If this is a select query (e.g., for a 'no action' or 'restrict'
2327 	 * trigger), we only need to see if there is a single row in the table,
2328 	 * matching the key.  Otherwise, limit = 0 - because we want the query to
2329 	 * affect ALL the matching rows.
2330 	 */
2331 	limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
2332 
2333 	/* Switch to proper UID to perform check as */
2334 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
2335 	SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
2336 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
2337 						   SECURITY_NOFORCE_RLS);
2338 
2339 	/* Finally we can run the query. */
2340 	spi_result = SPI_execute_snapshot(qplan,
2341 									  vals, nulls,
2342 									  test_snapshot, crosscheck_snapshot,
2343 									  false, false, limit);
2344 
2345 	/* Restore UID and security context */
2346 	SetUserIdAndSecContext(save_userid, save_sec_context);
2347 
2348 	/* Check result */
2349 	if (spi_result < 0)
2350 		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
2351 
2352 	if (expect_OK >= 0 && spi_result != expect_OK)
2353 		ereport(ERROR,
2354 				(errcode(ERRCODE_INTERNAL_ERROR),
2355 				 errmsg("referential integrity query on \"%s\" from constraint \"%s\" on \"%s\" gave unexpected result",
2356 						RelationGetRelationName(pk_rel),
2357 						NameStr(riinfo->conname),
2358 						RelationGetRelationName(fk_rel)),
2359 				 errhint("This is most likely due to a rule having rewritten the query.")));
2360 
2361 	/* XXX wouldn't it be clearer to do this part at the caller? */
2362 	if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
2363 		expect_OK == SPI_OK_SELECT &&
2364 		(SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
2365 		ri_ReportViolation(riinfo,
2366 						   pk_rel, fk_rel,
2367 						   newslot ? newslot : oldslot,
2368 						   NULL,
2369 						   qkey->constr_queryno, false);
2370 
2371 	return SPI_processed != 0;
2372 }
2373 
2374 /*
2375  * Extract fields from a tuple into Datum/nulls arrays
2376  */
2377 static void
ri_ExtractValues(Relation rel,TupleTableSlot * slot,const RI_ConstraintInfo * riinfo,bool rel_is_pk,Datum * vals,char * nulls)2378 ri_ExtractValues(Relation rel, TupleTableSlot *slot,
2379 				 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
2380 				 Datum *vals, char *nulls)
2381 {
2382 	const int16 *attnums;
2383 	bool		isnull;
2384 
2385 	if (rel_is_pk)
2386 		attnums = riinfo->pk_attnums;
2387 	else
2388 		attnums = riinfo->fk_attnums;
2389 
2390 	for (int i = 0; i < riinfo->nkeys; i++)
2391 	{
2392 		vals[i] = slot_getattr(slot, attnums[i], &isnull);
2393 		nulls[i] = isnull ? 'n' : ' ';
2394 	}
2395 }
2396 
2397 /*
2398  * Produce an error report
2399  *
2400  * If the failed constraint was on insert/update to the FK table,
2401  * we want the key names and values extracted from there, and the error
2402  * message to look like 'key blah is not present in PK'.
2403  * Otherwise, the attr names and values come from the PK table and the
2404  * message looks like 'key blah is still referenced from FK'.
2405  */
2406 static void
ri_ReportViolation(const RI_ConstraintInfo * riinfo,Relation pk_rel,Relation fk_rel,TupleTableSlot * violatorslot,TupleDesc tupdesc,int queryno,bool partgone)2407 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
2408 				   Relation pk_rel, Relation fk_rel,
2409 				   TupleTableSlot *violatorslot, TupleDesc tupdesc,
2410 				   int queryno, bool partgone)
2411 {
2412 	StringInfoData key_names;
2413 	StringInfoData key_values;
2414 	bool		onfk;
2415 	const int16 *attnums;
2416 	Oid			rel_oid;
2417 	AclResult	aclresult;
2418 	bool		has_perm = true;
2419 
2420 	/*
2421 	 * Determine which relation to complain about.  If tupdesc wasn't passed
2422 	 * by caller, assume the violator tuple came from there.
2423 	 */
2424 	onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
2425 	if (onfk)
2426 	{
2427 		attnums = riinfo->fk_attnums;
2428 		rel_oid = fk_rel->rd_id;
2429 		if (tupdesc == NULL)
2430 			tupdesc = fk_rel->rd_att;
2431 	}
2432 	else
2433 	{
2434 		attnums = riinfo->pk_attnums;
2435 		rel_oid = pk_rel->rd_id;
2436 		if (tupdesc == NULL)
2437 			tupdesc = pk_rel->rd_att;
2438 	}
2439 
2440 	/*
2441 	 * Check permissions- if the user does not have access to view the data in
2442 	 * any of the key columns then we don't include the errdetail() below.
2443 	 *
2444 	 * Check if RLS is enabled on the relation first.  If so, we don't return
2445 	 * any specifics to avoid leaking data.
2446 	 *
2447 	 * Check table-level permissions next and, failing that, column-level
2448 	 * privileges.
2449 	 *
2450 	 * When a partition at the referenced side is being detached/dropped, we
2451 	 * needn't check, since the user must be the table owner anyway.
2452 	 */
2453 	if (partgone)
2454 		has_perm = true;
2455 	else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
2456 	{
2457 		aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
2458 		if (aclresult != ACLCHECK_OK)
2459 		{
2460 			/* Try for column-level permissions */
2461 			for (int idx = 0; idx < riinfo->nkeys; idx++)
2462 			{
2463 				aclresult = pg_attribute_aclcheck(rel_oid, attnums[idx],
2464 												  GetUserId(),
2465 												  ACL_SELECT);
2466 
2467 				/* No access to the key */
2468 				if (aclresult != ACLCHECK_OK)
2469 				{
2470 					has_perm = false;
2471 					break;
2472 				}
2473 			}
2474 		}
2475 	}
2476 	else
2477 		has_perm = false;
2478 
2479 	if (has_perm)
2480 	{
2481 		/* Get printable versions of the keys involved */
2482 		initStringInfo(&key_names);
2483 		initStringInfo(&key_values);
2484 		for (int idx = 0; idx < riinfo->nkeys; idx++)
2485 		{
2486 			int			fnum = attnums[idx];
2487 			Form_pg_attribute att = TupleDescAttr(tupdesc, fnum - 1);
2488 			char	   *name,
2489 					   *val;
2490 			Datum		datum;
2491 			bool		isnull;
2492 
2493 			name = NameStr(att->attname);
2494 
2495 			datum = slot_getattr(violatorslot, fnum, &isnull);
2496 			if (!isnull)
2497 			{
2498 				Oid			foutoid;
2499 				bool		typisvarlena;
2500 
2501 				getTypeOutputInfo(att->atttypid, &foutoid, &typisvarlena);
2502 				val = OidOutputFunctionCall(foutoid, datum);
2503 			}
2504 			else
2505 				val = "null";
2506 
2507 			if (idx > 0)
2508 			{
2509 				appendStringInfoString(&key_names, ", ");
2510 				appendStringInfoString(&key_values, ", ");
2511 			}
2512 			appendStringInfoString(&key_names, name);
2513 			appendStringInfoString(&key_values, val);
2514 		}
2515 	}
2516 
2517 	if (partgone)
2518 		ereport(ERROR,
2519 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2520 				 errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
2521 						RelationGetRelationName(pk_rel),
2522 						NameStr(riinfo->conname)),
2523 				 errdetail("Key (%s)=(%s) is still referenced from table \"%s\".",
2524 						   key_names.data, key_values.data,
2525 						   RelationGetRelationName(fk_rel)),
2526 				 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2527 	else if (onfk)
2528 		ereport(ERROR,
2529 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2530 				 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
2531 						RelationGetRelationName(fk_rel),
2532 						NameStr(riinfo->conname)),
2533 				 has_perm ?
2534 				 errdetail("Key (%s)=(%s) is not present in table \"%s\".",
2535 						   key_names.data, key_values.data,
2536 						   RelationGetRelationName(pk_rel)) :
2537 				 errdetail("Key is not present in table \"%s\".",
2538 						   RelationGetRelationName(pk_rel)),
2539 				 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2540 	else
2541 		ereport(ERROR,
2542 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2543 				 errmsg("update or delete on table \"%s\" violates foreign key constraint \"%s\" on table \"%s\"",
2544 						RelationGetRelationName(pk_rel),
2545 						NameStr(riinfo->conname),
2546 						RelationGetRelationName(fk_rel)),
2547 				 has_perm ?
2548 				 errdetail("Key (%s)=(%s) is still referenced from table \"%s\".",
2549 						   key_names.data, key_values.data,
2550 						   RelationGetRelationName(fk_rel)) :
2551 				 errdetail("Key is still referenced from table \"%s\".",
2552 						   RelationGetRelationName(fk_rel)),
2553 				 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2554 }
2555 
2556 
2557 /*
2558  * ri_NullCheck -
2559  *
2560  * Determine the NULL state of all key values in a tuple
2561  *
2562  * Returns one of RI_KEYS_ALL_NULL, RI_KEYS_NONE_NULL or RI_KEYS_SOME_NULL.
2563  */
2564 static int
ri_NullCheck(TupleDesc tupDesc,TupleTableSlot * slot,const RI_ConstraintInfo * riinfo,bool rel_is_pk)2565 ri_NullCheck(TupleDesc tupDesc,
2566 			 TupleTableSlot *slot,
2567 			 const RI_ConstraintInfo *riinfo, bool rel_is_pk)
2568 {
2569 	const int16 *attnums;
2570 	bool		allnull = true;
2571 	bool		nonenull = true;
2572 
2573 	if (rel_is_pk)
2574 		attnums = riinfo->pk_attnums;
2575 	else
2576 		attnums = riinfo->fk_attnums;
2577 
2578 	for (int i = 0; i < riinfo->nkeys; i++)
2579 	{
2580 		if (slot_attisnull(slot, attnums[i]))
2581 			nonenull = false;
2582 		else
2583 			allnull = false;
2584 	}
2585 
2586 	if (allnull)
2587 		return RI_KEYS_ALL_NULL;
2588 
2589 	if (nonenull)
2590 		return RI_KEYS_NONE_NULL;
2591 
2592 	return RI_KEYS_SOME_NULL;
2593 }
2594 
2595 
2596 /*
2597  * ri_InitHashTables -
2598  *
2599  * Initialize our internal hash tables.
2600  */
2601 static void
ri_InitHashTables(void)2602 ri_InitHashTables(void)
2603 {
2604 	HASHCTL		ctl;
2605 
2606 	ctl.keysize = sizeof(Oid);
2607 	ctl.entrysize = sizeof(RI_ConstraintInfo);
2608 	ri_constraint_cache = hash_create("RI constraint cache",
2609 									  RI_INIT_CONSTRAINTHASHSIZE,
2610 									  &ctl, HASH_ELEM | HASH_BLOBS);
2611 
2612 	/* Arrange to flush cache on pg_constraint changes */
2613 	CacheRegisterSyscacheCallback(CONSTROID,
2614 								  InvalidateConstraintCacheCallBack,
2615 								  (Datum) 0);
2616 
2617 	ctl.keysize = sizeof(RI_QueryKey);
2618 	ctl.entrysize = sizeof(RI_QueryHashEntry);
2619 	ri_query_cache = hash_create("RI query cache",
2620 								 RI_INIT_QUERYHASHSIZE,
2621 								 &ctl, HASH_ELEM | HASH_BLOBS);
2622 
2623 	ctl.keysize = sizeof(RI_CompareKey);
2624 	ctl.entrysize = sizeof(RI_CompareHashEntry);
2625 	ri_compare_cache = hash_create("RI compare cache",
2626 								   RI_INIT_QUERYHASHSIZE,
2627 								   &ctl, HASH_ELEM | HASH_BLOBS);
2628 }
2629 
2630 
2631 /*
2632  * ri_FetchPreparedPlan -
2633  *
2634  * Lookup for a query key in our private hash table of prepared
2635  * and saved SPI execution plans. Return the plan if found or NULL.
2636  */
2637 static SPIPlanPtr
ri_FetchPreparedPlan(RI_QueryKey * key)2638 ri_FetchPreparedPlan(RI_QueryKey *key)
2639 {
2640 	RI_QueryHashEntry *entry;
2641 	SPIPlanPtr	plan;
2642 
2643 	/*
2644 	 * On the first call initialize the hashtable
2645 	 */
2646 	if (!ri_query_cache)
2647 		ri_InitHashTables();
2648 
2649 	/*
2650 	 * Lookup for the key
2651 	 */
2652 	entry = (RI_QueryHashEntry *) hash_search(ri_query_cache,
2653 											  (void *) key,
2654 											  HASH_FIND, NULL);
2655 	if (entry == NULL)
2656 		return NULL;
2657 
2658 	/*
2659 	 * Check whether the plan is still valid.  If it isn't, we don't want to
2660 	 * simply rely on plancache.c to regenerate it; rather we should start
2661 	 * from scratch and rebuild the query text too.  This is to cover cases
2662 	 * such as table/column renames.  We depend on the plancache machinery to
2663 	 * detect possible invalidations, though.
2664 	 *
2665 	 * CAUTION: this check is only trustworthy if the caller has already
2666 	 * locked both FK and PK rels.
2667 	 */
2668 	plan = entry->plan;
2669 	if (plan && SPI_plan_is_valid(plan))
2670 		return plan;
2671 
2672 	/*
2673 	 * Otherwise we might as well flush the cached plan now, to free a little
2674 	 * memory space before we make a new one.
2675 	 */
2676 	entry->plan = NULL;
2677 	if (plan)
2678 		SPI_freeplan(plan);
2679 
2680 	return NULL;
2681 }
2682 
2683 
2684 /*
2685  * ri_HashPreparedPlan -
2686  *
2687  * Add another plan to our private SPI query plan hashtable.
2688  */
2689 static void
ri_HashPreparedPlan(RI_QueryKey * key,SPIPlanPtr plan)2690 ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan)
2691 {
2692 	RI_QueryHashEntry *entry;
2693 	bool		found;
2694 
2695 	/*
2696 	 * On the first call initialize the hashtable
2697 	 */
2698 	if (!ri_query_cache)
2699 		ri_InitHashTables();
2700 
2701 	/*
2702 	 * Add the new plan.  We might be overwriting an entry previously found
2703 	 * invalid by ri_FetchPreparedPlan.
2704 	 */
2705 	entry = (RI_QueryHashEntry *) hash_search(ri_query_cache,
2706 											  (void *) key,
2707 											  HASH_ENTER, &found);
2708 	Assert(!found || entry->plan == NULL);
2709 	entry->plan = plan;
2710 }
2711 
2712 
2713 /*
2714  * ri_KeysEqual -
2715  *
2716  * Check if all key values in OLD and NEW are equal.
2717  *
2718  * Note: at some point we might wish to redefine this as checking for
2719  * "IS NOT DISTINCT" rather than "=", that is, allow two nulls to be
2720  * considered equal.  Currently there is no need since all callers have
2721  * previously found at least one of the rows to contain no nulls.
2722  */
2723 static bool
ri_KeysEqual(Relation rel,TupleTableSlot * oldslot,TupleTableSlot * newslot,const RI_ConstraintInfo * riinfo,bool rel_is_pk)2724 ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
2725 			 const RI_ConstraintInfo *riinfo, bool rel_is_pk)
2726 {
2727 	const int16 *attnums;
2728 
2729 	if (rel_is_pk)
2730 		attnums = riinfo->pk_attnums;
2731 	else
2732 		attnums = riinfo->fk_attnums;
2733 
2734 	/* XXX: could be worthwhile to fetch all necessary attrs at once */
2735 	for (int i = 0; i < riinfo->nkeys; i++)
2736 	{
2737 		Datum		oldvalue;
2738 		Datum		newvalue;
2739 		bool		isnull;
2740 
2741 		/*
2742 		 * Get one attribute's oldvalue. If it is NULL - they're not equal.
2743 		 */
2744 		oldvalue = slot_getattr(oldslot, attnums[i], &isnull);
2745 		if (isnull)
2746 			return false;
2747 
2748 		/*
2749 		 * Get one attribute's newvalue. If it is NULL - they're not equal.
2750 		 */
2751 		newvalue = slot_getattr(newslot, attnums[i], &isnull);
2752 		if (isnull)
2753 			return false;
2754 
2755 		if (rel_is_pk)
2756 		{
2757 			/*
2758 			 * If we are looking at the PK table, then do a bytewise
2759 			 * comparison.  We must propagate PK changes if the value is
2760 			 * changed to one that "looks" different but would compare as
2761 			 * equal using the equality operator.  This only makes a
2762 			 * difference for ON UPDATE CASCADE, but for consistency we treat
2763 			 * all changes to the PK the same.
2764 			 */
2765 			Form_pg_attribute att = TupleDescAttr(oldslot->tts_tupleDescriptor, attnums[i] - 1);
2766 
2767 			if (!datum_image_eq(oldvalue, newvalue, att->attbyval, att->attlen))
2768 				return false;
2769 		}
2770 		else
2771 		{
2772 			/*
2773 			 * For the FK table, compare with the appropriate equality
2774 			 * operator.  Changes that compare equal will still satisfy the
2775 			 * constraint after the update.
2776 			 */
2777 			if (!ri_AttributesEqual(riinfo->ff_eq_oprs[i], RIAttType(rel, attnums[i]),
2778 									oldvalue, newvalue))
2779 				return false;
2780 		}
2781 	}
2782 
2783 	return true;
2784 }
2785 
2786 
2787 /*
2788  * ri_AttributesEqual -
2789  *
2790  * Call the appropriate equality comparison operator for two values.
2791  *
2792  * NB: we have already checked that neither value is null.
2793  */
2794 static bool
ri_AttributesEqual(Oid eq_opr,Oid typeid,Datum oldvalue,Datum newvalue)2795 ri_AttributesEqual(Oid eq_opr, Oid typeid,
2796 				   Datum oldvalue, Datum newvalue)
2797 {
2798 	RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
2799 
2800 	/* Do we need to cast the values? */
2801 	if (OidIsValid(entry->cast_func_finfo.fn_oid))
2802 	{
2803 		oldvalue = FunctionCall3(&entry->cast_func_finfo,
2804 								 oldvalue,
2805 								 Int32GetDatum(-1), /* typmod */
2806 								 BoolGetDatum(false));	/* implicit coercion */
2807 		newvalue = FunctionCall3(&entry->cast_func_finfo,
2808 								 newvalue,
2809 								 Int32GetDatum(-1), /* typmod */
2810 								 BoolGetDatum(false));	/* implicit coercion */
2811 	}
2812 
2813 	/*
2814 	 * Apply the comparison operator.
2815 	 *
2816 	 * Note: This function is part of a call stack that determines whether an
2817 	 * update to a row is significant enough that it needs checking or action
2818 	 * on the other side of a foreign-key constraint.  Therefore, the
2819 	 * comparison here would need to be done with the collation of the *other*
2820 	 * table.  For simplicity (e.g., we might not even have the other table
2821 	 * open), we'll just use the default collation here, which could lead to
2822 	 * some false negatives.  All this would break if we ever allow
2823 	 * database-wide collations to be nondeterministic.
2824 	 */
2825 	return DatumGetBool(FunctionCall2Coll(&entry->eq_opr_finfo,
2826 										  DEFAULT_COLLATION_OID,
2827 										  oldvalue, newvalue));
2828 }
2829 
2830 /*
2831  * ri_HashCompareOp -
2832  *
2833  * See if we know how to compare two values, and create a new hash entry
2834  * if not.
2835  */
2836 static RI_CompareHashEntry *
ri_HashCompareOp(Oid eq_opr,Oid typeid)2837 ri_HashCompareOp(Oid eq_opr, Oid typeid)
2838 {
2839 	RI_CompareKey key;
2840 	RI_CompareHashEntry *entry;
2841 	bool		found;
2842 
2843 	/*
2844 	 * On the first call initialize the hashtable
2845 	 */
2846 	if (!ri_compare_cache)
2847 		ri_InitHashTables();
2848 
2849 	/*
2850 	 * Find or create a hash entry.  Note we're assuming RI_CompareKey
2851 	 * contains no struct padding.
2852 	 */
2853 	key.eq_opr = eq_opr;
2854 	key.typeid = typeid;
2855 	entry = (RI_CompareHashEntry *) hash_search(ri_compare_cache,
2856 												(void *) &key,
2857 												HASH_ENTER, &found);
2858 	if (!found)
2859 		entry->valid = false;
2860 
2861 	/*
2862 	 * If not already initialized, do so.  Since we'll keep this hash entry
2863 	 * for the life of the backend, put any subsidiary info for the function
2864 	 * cache structs into TopMemoryContext.
2865 	 */
2866 	if (!entry->valid)
2867 	{
2868 		Oid			lefttype,
2869 					righttype,
2870 					castfunc;
2871 		CoercionPathType pathtype;
2872 
2873 		/* We always need to know how to call the equality operator */
2874 		fmgr_info_cxt(get_opcode(eq_opr), &entry->eq_opr_finfo,
2875 					  TopMemoryContext);
2876 
2877 		/*
2878 		 * If we chose to use a cast from FK to PK type, we may have to apply
2879 		 * the cast function to get to the operator's input type.
2880 		 *
2881 		 * XXX eventually it would be good to support array-coercion cases
2882 		 * here and in ri_AttributesEqual().  At the moment there is no point
2883 		 * because cases involving nonidentical array types will be rejected
2884 		 * at constraint creation time.
2885 		 *
2886 		 * XXX perhaps also consider supporting CoerceViaIO?  No need at the
2887 		 * moment since that will never be generated for implicit coercions.
2888 		 */
2889 		op_input_types(eq_opr, &lefttype, &righttype);
2890 		Assert(lefttype == righttype);
2891 		if (typeid == lefttype)
2892 			castfunc = InvalidOid;	/* simplest case */
2893 		else
2894 		{
2895 			pathtype = find_coercion_pathway(lefttype, typeid,
2896 											 COERCION_IMPLICIT,
2897 											 &castfunc);
2898 			if (pathtype != COERCION_PATH_FUNC &&
2899 				pathtype != COERCION_PATH_RELABELTYPE)
2900 			{
2901 				/*
2902 				 * The declared input type of the eq_opr might be a
2903 				 * polymorphic type such as ANYARRAY or ANYENUM, or other
2904 				 * special cases such as RECORD; find_coercion_pathway
2905 				 * currently doesn't subsume these special cases.
2906 				 */
2907 				if (!IsBinaryCoercible(typeid, lefttype))
2908 					elog(ERROR, "no conversion function from %s to %s",
2909 						 format_type_be(typeid),
2910 						 format_type_be(lefttype));
2911 			}
2912 		}
2913 		if (OidIsValid(castfunc))
2914 			fmgr_info_cxt(castfunc, &entry->cast_func_finfo,
2915 						  TopMemoryContext);
2916 		else
2917 			entry->cast_func_finfo.fn_oid = InvalidOid;
2918 		entry->valid = true;
2919 	}
2920 
2921 	return entry;
2922 }
2923 
2924 
2925 /*
2926  * Given a trigger function OID, determine whether it is an RI trigger,
2927  * and if so whether it is attached to PK or FK relation.
2928  */
2929 int
RI_FKey_trigger_type(Oid tgfoid)2930 RI_FKey_trigger_type(Oid tgfoid)
2931 {
2932 	switch (tgfoid)
2933 	{
2934 		case F_RI_FKEY_CASCADE_DEL:
2935 		case F_RI_FKEY_CASCADE_UPD:
2936 		case F_RI_FKEY_RESTRICT_DEL:
2937 		case F_RI_FKEY_RESTRICT_UPD:
2938 		case F_RI_FKEY_SETNULL_DEL:
2939 		case F_RI_FKEY_SETNULL_UPD:
2940 		case F_RI_FKEY_SETDEFAULT_DEL:
2941 		case F_RI_FKEY_SETDEFAULT_UPD:
2942 		case F_RI_FKEY_NOACTION_DEL:
2943 		case F_RI_FKEY_NOACTION_UPD:
2944 			return RI_TRIGGER_PK;
2945 
2946 		case F_RI_FKEY_CHECK_INS:
2947 		case F_RI_FKEY_CHECK_UPD:
2948 			return RI_TRIGGER_FK;
2949 	}
2950 
2951 	return RI_TRIGGER_NONE;
2952 }
2953