1 /*-------------------------------------------------------------------------
2 *
3 * ri_triggers.c
4 *
5 * Generic trigger procedures for referential integrity constraint
6 * checks.
7 *
8 * Note about memory management: the private hashtables kept here live
9 * across query and transaction boundaries, in fact they live as long as
10 * the backend does. This works because the hashtable structures
11 * themselves are allocated by dynahash.c in its permanent DynaHashCxt,
12 * and the SPI plans they point to are saved using SPI_keepplan().
13 * There is not currently any provision for throwing away a no-longer-needed
14 * plan --- consider improving this someday.
15 *
16 *
17 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
18 *
19 * src/backend/utils/adt/ri_triggers.c
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "access/htup_details.h"
27 #include "access/sysattr.h"
28 #include "access/table.h"
29 #include "access/tableam.h"
30 #include "access/xact.h"
31 #include "catalog/pg_collation.h"
32 #include "catalog/pg_constraint.h"
33 #include "catalog/pg_operator.h"
34 #include "catalog/pg_type.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
37 #include "executor/spi.h"
38 #include "lib/ilist.h"
39 #include "miscadmin.h"
40 #include "parser/parse_coerce.h"
41 #include "parser/parse_relation.h"
42 #include "storage/bufmgr.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/datum.h"
46 #include "utils/fmgroids.h"
47 #include "utils/guc.h"
48 #include "utils/inval.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/rel.h"
52 #include "utils/rls.h"
53 #include "utils/ruleutils.h"
54 #include "utils/snapmgr.h"
55 #include "utils/syscache.h"
56
57 /*
58 * Local definitions
59 */
60
61 #define RI_MAX_NUMKEYS INDEX_MAX_KEYS
62
63 #define RI_INIT_CONSTRAINTHASHSIZE 64
64 #define RI_INIT_QUERYHASHSIZE (RI_INIT_CONSTRAINTHASHSIZE * 4)
65
66 #define RI_KEYS_ALL_NULL 0
67 #define RI_KEYS_SOME_NULL 1
68 #define RI_KEYS_NONE_NULL 2
69
70 /* RI query type codes */
71 /* these queries are executed against the PK (referenced) table: */
72 #define RI_PLAN_CHECK_LOOKUPPK 1
73 #define RI_PLAN_CHECK_LOOKUPPK_FROM_PK 2
74 #define RI_PLAN_LAST_ON_PK RI_PLAN_CHECK_LOOKUPPK_FROM_PK
75 /* these queries are executed against the FK (referencing) table: */
76 #define RI_PLAN_CASCADE_DEL_DODELETE 3
77 #define RI_PLAN_CASCADE_UPD_DOUPDATE 4
78 #define RI_PLAN_RESTRICT_CHECKREF 5
79 #define RI_PLAN_SETNULL_DOUPDATE 6
80 #define RI_PLAN_SETDEFAULT_DOUPDATE 7
81
82 #define MAX_QUOTED_NAME_LEN (NAMEDATALEN*2+3)
83 #define MAX_QUOTED_REL_NAME_LEN (MAX_QUOTED_NAME_LEN*2)
84
85 #define RIAttName(rel, attnum) NameStr(*attnumAttName(rel, attnum))
86 #define RIAttType(rel, attnum) attnumTypeId(rel, attnum)
87 #define RIAttCollation(rel, attnum) attnumCollationId(rel, attnum)
88
89 #define RI_TRIGTYPE_INSERT 1
90 #define RI_TRIGTYPE_UPDATE 2
91 #define RI_TRIGTYPE_DELETE 3
92
93
94 /*
95 * RI_ConstraintInfo
96 *
97 * Information extracted from an FK pg_constraint entry. This is cached in
98 * ri_constraint_cache.
99 */
100 typedef struct RI_ConstraintInfo
101 {
102 Oid constraint_id; /* OID of pg_constraint entry (hash key) */
103 bool valid; /* successfully initialized? */
104 Oid constraint_root_id; /* OID of topmost ancestor constraint;
105 * same as constraint_id if not inherited */
106 uint32 oidHashValue; /* hash value of constraint_id */
107 uint32 rootHashValue; /* hash value of constraint_root_id */
108 NameData conname; /* name of the FK constraint */
109 Oid pk_relid; /* referenced relation */
110 Oid fk_relid; /* referencing relation */
111 char confupdtype; /* foreign key's ON UPDATE action */
112 char confdeltype; /* foreign key's ON DELETE action */
113 char confmatchtype; /* foreign key's match type */
114 int nkeys; /* number of key columns */
115 int16 pk_attnums[RI_MAX_NUMKEYS]; /* attnums of referenced cols */
116 int16 fk_attnums[RI_MAX_NUMKEYS]; /* attnums of referencing cols */
117 Oid pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = FK) */
118 Oid pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = PK) */
119 Oid ff_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (FK = FK) */
120 dlist_node valid_link; /* Link in list of valid entries */
121 } RI_ConstraintInfo;
122
123 /*
124 * RI_QueryKey
125 *
126 * The key identifying a prepared SPI plan in our query hashtable
127 */
128 typedef struct RI_QueryKey
129 {
130 Oid constr_id; /* OID of pg_constraint entry */
131 int32 constr_queryno; /* query type ID, see RI_PLAN_XXX above */
132 } RI_QueryKey;
133
134 /*
135 * RI_QueryHashEntry
136 */
137 typedef struct RI_QueryHashEntry
138 {
139 RI_QueryKey key;
140 SPIPlanPtr plan;
141 } RI_QueryHashEntry;
142
143 /*
144 * RI_CompareKey
145 *
146 * The key identifying an entry showing how to compare two values
147 */
148 typedef struct RI_CompareKey
149 {
150 Oid eq_opr; /* the equality operator to apply */
151 Oid typeid; /* the data type to apply it to */
152 } RI_CompareKey;
153
154 /*
155 * RI_CompareHashEntry
156 */
157 typedef struct RI_CompareHashEntry
158 {
159 RI_CompareKey key;
160 bool valid; /* successfully initialized? */
161 FmgrInfo eq_opr_finfo; /* call info for equality fn */
162 FmgrInfo cast_func_finfo; /* in case we must coerce input */
163 } RI_CompareHashEntry;
164
165
166 /*
167 * Local data
168 */
169 static HTAB *ri_constraint_cache = NULL;
170 static HTAB *ri_query_cache = NULL;
171 static HTAB *ri_compare_cache = NULL;
172 static dlist_head ri_constraint_cache_valid_list;
173 static int ri_constraint_cache_valid_count = 0;
174
175
176 /*
177 * Local function prototypes
178 */
179 static bool ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
180 TupleTableSlot *oldslot,
181 const RI_ConstraintInfo *riinfo);
182 static Datum ri_restrict(TriggerData *trigdata, bool is_no_action);
183 static Datum ri_set(TriggerData *trigdata, bool is_set_null);
184 static void quoteOneName(char *buffer, const char *name);
185 static void quoteRelationName(char *buffer, Relation rel);
186 static void ri_GenerateQual(StringInfo buf,
187 const char *sep,
188 const char *leftop, Oid leftoptype,
189 Oid opoid,
190 const char *rightop, Oid rightoptype);
191 static void ri_GenerateQualCollation(StringInfo buf, Oid collation);
192 static int ri_NullCheck(TupleDesc tupdesc, TupleTableSlot *slot,
193 const RI_ConstraintInfo *riinfo, bool rel_is_pk);
194 static void ri_BuildQueryKey(RI_QueryKey *key,
195 const RI_ConstraintInfo *riinfo,
196 int32 constr_queryno);
197 static bool ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
198 const RI_ConstraintInfo *riinfo, bool rel_is_pk);
199 static bool ri_AttributesEqual(Oid eq_opr, Oid typeid,
200 Datum oldvalue, Datum newvalue);
201
202 static void ri_InitHashTables(void);
203 static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue);
204 static SPIPlanPtr ri_FetchPreparedPlan(RI_QueryKey *key);
205 static void ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan);
206 static RI_CompareHashEntry *ri_HashCompareOp(Oid eq_opr, Oid typeid);
207
208 static void ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname,
209 int tgkind);
210 static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
211 Relation trig_rel, bool rel_is_pk);
212 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
213 static Oid get_ri_constraint_root(Oid constrOid);
214 static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
215 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
216 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
217 RI_QueryKey *qkey, SPIPlanPtr qplan,
218 Relation fk_rel, Relation pk_rel,
219 TupleTableSlot *oldslot, TupleTableSlot *newslot,
220 bool detectNewRows, int expect_OK);
221 static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
222 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
223 Datum *vals, char *nulls);
224 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
225 Relation pk_rel, Relation fk_rel,
226 TupleTableSlot *violatorslot, TupleDesc tupdesc,
227 int queryno, bool partgone) pg_attribute_noreturn();
228
229
230 /*
231 * RI_FKey_check -
232 *
233 * Check foreign key existence (combined for INSERT and UPDATE).
234 */
235 static Datum
RI_FKey_check(TriggerData * trigdata)236 RI_FKey_check(TriggerData *trigdata)
237 {
238 const RI_ConstraintInfo *riinfo;
239 Relation fk_rel;
240 Relation pk_rel;
241 TupleTableSlot *newslot;
242 RI_QueryKey qkey;
243 SPIPlanPtr qplan;
244
245 riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
246 trigdata->tg_relation, false);
247
248 if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
249 newslot = trigdata->tg_newslot;
250 else
251 newslot = trigdata->tg_trigslot;
252
253 /*
254 * We should not even consider checking the row if it is no longer valid,
255 * since it was either deleted (so the deferred check should be skipped)
256 * or updated (in which case only the latest version of the row should be
257 * checked). Test its liveness according to SnapshotSelf. We need pin
258 * and lock on the buffer to call HeapTupleSatisfiesVisibility. Caller
259 * should be holding pin, but not lock.
260 */
261 if (!table_tuple_satisfies_snapshot(trigdata->tg_relation, newslot, SnapshotSelf))
262 return PointerGetDatum(NULL);
263
264 /*
265 * Get the relation descriptors of the FK and PK tables.
266 *
267 * pk_rel is opened in RowShareLock mode since that's what our eventual
268 * SELECT FOR KEY SHARE will get on it.
269 */
270 fk_rel = trigdata->tg_relation;
271 pk_rel = table_open(riinfo->pk_relid, RowShareLock);
272
273 switch (ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false))
274 {
275 case RI_KEYS_ALL_NULL:
276
277 /*
278 * No further check needed - an all-NULL key passes every type of
279 * foreign key constraint.
280 */
281 table_close(pk_rel, RowShareLock);
282 return PointerGetDatum(NULL);
283
284 case RI_KEYS_SOME_NULL:
285
286 /*
287 * This is the only case that differs between the three kinds of
288 * MATCH.
289 */
290 switch (riinfo->confmatchtype)
291 {
292 case FKCONSTR_MATCH_FULL:
293
294 /*
295 * Not allowed - MATCH FULL says either all or none of the
296 * attributes can be NULLs
297 */
298 ereport(ERROR,
299 (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
300 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
301 RelationGetRelationName(fk_rel),
302 NameStr(riinfo->conname)),
303 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
304 errtableconstraint(fk_rel,
305 NameStr(riinfo->conname))));
306 table_close(pk_rel, RowShareLock);
307 return PointerGetDatum(NULL);
308
309 case FKCONSTR_MATCH_SIMPLE:
310
311 /*
312 * MATCH SIMPLE - if ANY column is null, the key passes
313 * the constraint.
314 */
315 table_close(pk_rel, RowShareLock);
316 return PointerGetDatum(NULL);
317
318 #ifdef NOT_USED
319 case FKCONSTR_MATCH_PARTIAL:
320
321 /*
322 * MATCH PARTIAL - all non-null columns must match. (not
323 * implemented, can be done by modifying the query below
324 * to only include non-null columns, or by writing a
325 * special version here)
326 */
327 break;
328 #endif
329 }
330
331 case RI_KEYS_NONE_NULL:
332
333 /*
334 * Have a full qualified key - continue below for all three kinds
335 * of MATCH.
336 */
337 break;
338 }
339
340 if (SPI_connect() != SPI_OK_CONNECT)
341 elog(ERROR, "SPI_connect failed");
342
343 /* Fetch or prepare a saved plan for the real check */
344 ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
345
346 if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
347 {
348 StringInfoData querybuf;
349 char pkrelname[MAX_QUOTED_REL_NAME_LEN];
350 char attname[MAX_QUOTED_NAME_LEN];
351 char paramname[16];
352 const char *querysep;
353 Oid queryoids[RI_MAX_NUMKEYS];
354 const char *pk_only;
355
356 /* ----------
357 * The query string built is
358 * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
359 * FOR KEY SHARE OF x
360 * The type id's for the $ parameters are those of the
361 * corresponding FK attributes.
362 * ----------
363 */
364 initStringInfo(&querybuf);
365 pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
366 "" : "ONLY ";
367 quoteRelationName(pkrelname, pk_rel);
368 appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
369 pk_only, pkrelname);
370 querysep = "WHERE";
371 for (int i = 0; i < riinfo->nkeys; i++)
372 {
373 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
374 Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
375
376 quoteOneName(attname,
377 RIAttName(pk_rel, riinfo->pk_attnums[i]));
378 sprintf(paramname, "$%d", i + 1);
379 ri_GenerateQual(&querybuf, querysep,
380 attname, pk_type,
381 riinfo->pf_eq_oprs[i],
382 paramname, fk_type);
383 querysep = "AND";
384 queryoids[i] = fk_type;
385 }
386 appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
387
388 /* Prepare and save the plan */
389 qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
390 &qkey, fk_rel, pk_rel);
391 }
392
393 /*
394 * Now check that foreign key exists in PK table
395 *
396 * XXX detectNewRows must be true when a partitioned table is on the
397 * referenced side. The reason is that our snapshot must be fresh in
398 * order for the hack in find_inheritance_children() to work.
399 */
400 ri_PerformCheck(riinfo, &qkey, qplan,
401 fk_rel, pk_rel,
402 NULL, newslot,
403 pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
404 SPI_OK_SELECT);
405
406 if (SPI_finish() != SPI_OK_FINISH)
407 elog(ERROR, "SPI_finish failed");
408
409 table_close(pk_rel, RowShareLock);
410
411 return PointerGetDatum(NULL);
412 }
413
414
415 /*
416 * RI_FKey_check_ins -
417 *
418 * Check foreign key existence at insert event on FK table.
419 */
420 Datum
RI_FKey_check_ins(PG_FUNCTION_ARGS)421 RI_FKey_check_ins(PG_FUNCTION_ARGS)
422 {
423 /* Check that this is a valid trigger call on the right time and event. */
424 ri_CheckTrigger(fcinfo, "RI_FKey_check_ins", RI_TRIGTYPE_INSERT);
425
426 /* Share code with UPDATE case. */
427 return RI_FKey_check((TriggerData *) fcinfo->context);
428 }
429
430
431 /*
432 * RI_FKey_check_upd -
433 *
434 * Check foreign key existence at update event on FK table.
435 */
436 Datum
RI_FKey_check_upd(PG_FUNCTION_ARGS)437 RI_FKey_check_upd(PG_FUNCTION_ARGS)
438 {
439 /* Check that this is a valid trigger call on the right time and event. */
440 ri_CheckTrigger(fcinfo, "RI_FKey_check_upd", RI_TRIGTYPE_UPDATE);
441
442 /* Share code with INSERT case. */
443 return RI_FKey_check((TriggerData *) fcinfo->context);
444 }
445
446
447 /*
448 * ri_Check_Pk_Match
449 *
450 * Check to see if another PK row has been created that provides the same
451 * key values as the "oldslot" that's been modified or deleted in our trigger
452 * event. Returns true if a match is found in the PK table.
453 *
454 * We assume the caller checked that the oldslot contains no NULL key values,
455 * since otherwise a match is impossible.
456 */
457 static bool
ri_Check_Pk_Match(Relation pk_rel,Relation fk_rel,TupleTableSlot * oldslot,const RI_ConstraintInfo * riinfo)458 ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
459 TupleTableSlot *oldslot,
460 const RI_ConstraintInfo *riinfo)
461 {
462 SPIPlanPtr qplan;
463 RI_QueryKey qkey;
464 bool result;
465
466 /* Only called for non-null rows */
467 Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
468
469 if (SPI_connect() != SPI_OK_CONNECT)
470 elog(ERROR, "SPI_connect failed");
471
472 /*
473 * Fetch or prepare a saved plan for checking PK table with values coming
474 * from a PK row
475 */
476 ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
477
478 if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
479 {
480 StringInfoData querybuf;
481 char pkrelname[MAX_QUOTED_REL_NAME_LEN];
482 char attname[MAX_QUOTED_NAME_LEN];
483 char paramname[16];
484 const char *querysep;
485 const char *pk_only;
486 Oid queryoids[RI_MAX_NUMKEYS];
487
488 /* ----------
489 * The query string built is
490 * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
491 * FOR KEY SHARE OF x
492 * The type id's for the $ parameters are those of the
493 * PK attributes themselves.
494 * ----------
495 */
496 initStringInfo(&querybuf);
497 pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
498 "" : "ONLY ";
499 quoteRelationName(pkrelname, pk_rel);
500 appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
501 pk_only, pkrelname);
502 querysep = "WHERE";
503 for (int i = 0; i < riinfo->nkeys; i++)
504 {
505 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
506
507 quoteOneName(attname,
508 RIAttName(pk_rel, riinfo->pk_attnums[i]));
509 sprintf(paramname, "$%d", i + 1);
510 ri_GenerateQual(&querybuf, querysep,
511 attname, pk_type,
512 riinfo->pp_eq_oprs[i],
513 paramname, pk_type);
514 querysep = "AND";
515 queryoids[i] = pk_type;
516 }
517 appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
518
519 /* Prepare and save the plan */
520 qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
521 &qkey, fk_rel, pk_rel);
522 }
523
524 /*
525 * We have a plan now. Run it.
526 */
527 result = ri_PerformCheck(riinfo, &qkey, qplan,
528 fk_rel, pk_rel,
529 oldslot, NULL,
530 true, /* treat like update */
531 SPI_OK_SELECT);
532
533 if (SPI_finish() != SPI_OK_FINISH)
534 elog(ERROR, "SPI_finish failed");
535
536 return result;
537 }
538
539
540 /*
541 * RI_FKey_noaction_del -
542 *
543 * Give an error and roll back the current transaction if the
544 * delete has resulted in a violation of the given referential
545 * integrity constraint.
546 */
547 Datum
RI_FKey_noaction_del(PG_FUNCTION_ARGS)548 RI_FKey_noaction_del(PG_FUNCTION_ARGS)
549 {
550 /* Check that this is a valid trigger call on the right time and event. */
551 ri_CheckTrigger(fcinfo, "RI_FKey_noaction_del", RI_TRIGTYPE_DELETE);
552
553 /* Share code with RESTRICT/UPDATE cases. */
554 return ri_restrict((TriggerData *) fcinfo->context, true);
555 }
556
557 /*
558 * RI_FKey_restrict_del -
559 *
560 * Restrict delete from PK table to rows unreferenced by foreign key.
561 *
562 * The SQL standard intends that this referential action occur exactly when
563 * the delete is performed, rather than after. This appears to be
564 * the only difference between "NO ACTION" and "RESTRICT". In Postgres
565 * we still implement this as an AFTER trigger, but it's non-deferrable.
566 */
567 Datum
RI_FKey_restrict_del(PG_FUNCTION_ARGS)568 RI_FKey_restrict_del(PG_FUNCTION_ARGS)
569 {
570 /* Check that this is a valid trigger call on the right time and event. */
571 ri_CheckTrigger(fcinfo, "RI_FKey_restrict_del", RI_TRIGTYPE_DELETE);
572
573 /* Share code with NO ACTION/UPDATE cases. */
574 return ri_restrict((TriggerData *) fcinfo->context, false);
575 }
576
577 /*
578 * RI_FKey_noaction_upd -
579 *
580 * Give an error and roll back the current transaction if the
581 * update has resulted in a violation of the given referential
582 * integrity constraint.
583 */
584 Datum
RI_FKey_noaction_upd(PG_FUNCTION_ARGS)585 RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
586 {
587 /* Check that this is a valid trigger call on the right time and event. */
588 ri_CheckTrigger(fcinfo, "RI_FKey_noaction_upd", RI_TRIGTYPE_UPDATE);
589
590 /* Share code with RESTRICT/DELETE cases. */
591 return ri_restrict((TriggerData *) fcinfo->context, true);
592 }
593
594 /*
595 * RI_FKey_restrict_upd -
596 *
597 * Restrict update of PK to rows unreferenced by foreign key.
598 *
599 * The SQL standard intends that this referential action occur exactly when
600 * the update is performed, rather than after. This appears to be
601 * the only difference between "NO ACTION" and "RESTRICT". In Postgres
602 * we still implement this as an AFTER trigger, but it's non-deferrable.
603 */
604 Datum
RI_FKey_restrict_upd(PG_FUNCTION_ARGS)605 RI_FKey_restrict_upd(PG_FUNCTION_ARGS)
606 {
607 /* Check that this is a valid trigger call on the right time and event. */
608 ri_CheckTrigger(fcinfo, "RI_FKey_restrict_upd", RI_TRIGTYPE_UPDATE);
609
610 /* Share code with NO ACTION/DELETE cases. */
611 return ri_restrict((TriggerData *) fcinfo->context, false);
612 }
613
614 /*
615 * ri_restrict -
616 *
617 * Common code for ON DELETE RESTRICT, ON DELETE NO ACTION,
618 * ON UPDATE RESTRICT, and ON UPDATE NO ACTION.
619 */
620 static Datum
ri_restrict(TriggerData * trigdata,bool is_no_action)621 ri_restrict(TriggerData *trigdata, bool is_no_action)
622 {
623 const RI_ConstraintInfo *riinfo;
624 Relation fk_rel;
625 Relation pk_rel;
626 TupleTableSlot *oldslot;
627 RI_QueryKey qkey;
628 SPIPlanPtr qplan;
629
630 riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
631 trigdata->tg_relation, true);
632
633 /*
634 * Get the relation descriptors of the FK and PK tables and the old tuple.
635 *
636 * fk_rel is opened in RowShareLock mode since that's what our eventual
637 * SELECT FOR KEY SHARE will get on it.
638 */
639 fk_rel = table_open(riinfo->fk_relid, RowShareLock);
640 pk_rel = trigdata->tg_relation;
641 oldslot = trigdata->tg_trigslot;
642
643 /*
644 * If another PK row now exists providing the old key values, we should
645 * not do anything. However, this check should only be made in the NO
646 * ACTION case; in RESTRICT cases we don't wish to allow another row to be
647 * substituted.
648 */
649 if (is_no_action &&
650 ri_Check_Pk_Match(pk_rel, fk_rel, oldslot, riinfo))
651 {
652 table_close(fk_rel, RowShareLock);
653 return PointerGetDatum(NULL);
654 }
655
656 if (SPI_connect() != SPI_OK_CONNECT)
657 elog(ERROR, "SPI_connect failed");
658
659 /*
660 * Fetch or prepare a saved plan for the restrict lookup (it's the same
661 * query for delete and update cases)
662 */
663 ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_RESTRICT_CHECKREF);
664
665 if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
666 {
667 StringInfoData querybuf;
668 char fkrelname[MAX_QUOTED_REL_NAME_LEN];
669 char attname[MAX_QUOTED_NAME_LEN];
670 char paramname[16];
671 const char *querysep;
672 Oid queryoids[RI_MAX_NUMKEYS];
673 const char *fk_only;
674
675 /* ----------
676 * The query string built is
677 * SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
678 * FOR KEY SHARE OF x
679 * The type id's for the $ parameters are those of the
680 * corresponding PK attributes.
681 * ----------
682 */
683 initStringInfo(&querybuf);
684 fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
685 "" : "ONLY ";
686 quoteRelationName(fkrelname, fk_rel);
687 appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
688 fk_only, fkrelname);
689 querysep = "WHERE";
690 for (int i = 0; i < riinfo->nkeys; i++)
691 {
692 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
693 Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
694 Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
695 Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
696
697 quoteOneName(attname,
698 RIAttName(fk_rel, riinfo->fk_attnums[i]));
699 sprintf(paramname, "$%d", i + 1);
700 ri_GenerateQual(&querybuf, querysep,
701 paramname, pk_type,
702 riinfo->pf_eq_oprs[i],
703 attname, fk_type);
704 if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
705 ri_GenerateQualCollation(&querybuf, pk_coll);
706 querysep = "AND";
707 queryoids[i] = pk_type;
708 }
709 appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
710
711 /* Prepare and save the plan */
712 qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
713 &qkey, fk_rel, pk_rel);
714 }
715
716 /*
717 * We have a plan now. Run it to check for existing references.
718 */
719 ri_PerformCheck(riinfo, &qkey, qplan,
720 fk_rel, pk_rel,
721 oldslot, NULL,
722 true, /* must detect new rows */
723 SPI_OK_SELECT);
724
725 if (SPI_finish() != SPI_OK_FINISH)
726 elog(ERROR, "SPI_finish failed");
727
728 table_close(fk_rel, RowShareLock);
729
730 return PointerGetDatum(NULL);
731 }
732
733
734 /*
735 * RI_FKey_cascade_del -
736 *
737 * Cascaded delete foreign key references at delete event on PK table.
738 */
739 Datum
RI_FKey_cascade_del(PG_FUNCTION_ARGS)740 RI_FKey_cascade_del(PG_FUNCTION_ARGS)
741 {
742 TriggerData *trigdata = (TriggerData *) fcinfo->context;
743 const RI_ConstraintInfo *riinfo;
744 Relation fk_rel;
745 Relation pk_rel;
746 TupleTableSlot *oldslot;
747 RI_QueryKey qkey;
748 SPIPlanPtr qplan;
749
750 /* Check that this is a valid trigger call on the right time and event. */
751 ri_CheckTrigger(fcinfo, "RI_FKey_cascade_del", RI_TRIGTYPE_DELETE);
752
753 riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
754 trigdata->tg_relation, true);
755
756 /*
757 * Get the relation descriptors of the FK and PK tables and the old tuple.
758 *
759 * fk_rel is opened in RowExclusiveLock mode since that's what our
760 * eventual DELETE will get on it.
761 */
762 fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
763 pk_rel = trigdata->tg_relation;
764 oldslot = trigdata->tg_trigslot;
765
766 if (SPI_connect() != SPI_OK_CONNECT)
767 elog(ERROR, "SPI_connect failed");
768
769 /* Fetch or prepare a saved plan for the cascaded delete */
770 ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_DEL_DODELETE);
771
772 if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
773 {
774 StringInfoData querybuf;
775 char fkrelname[MAX_QUOTED_REL_NAME_LEN];
776 char attname[MAX_QUOTED_NAME_LEN];
777 char paramname[16];
778 const char *querysep;
779 Oid queryoids[RI_MAX_NUMKEYS];
780 const char *fk_only;
781
782 /* ----------
783 * The query string built is
784 * DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
785 * The type id's for the $ parameters are those of the
786 * corresponding PK attributes.
787 * ----------
788 */
789 initStringInfo(&querybuf);
790 fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
791 "" : "ONLY ";
792 quoteRelationName(fkrelname, fk_rel);
793 appendStringInfo(&querybuf, "DELETE FROM %s%s",
794 fk_only, fkrelname);
795 querysep = "WHERE";
796 for (int i = 0; i < riinfo->nkeys; i++)
797 {
798 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
799 Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
800 Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
801 Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
802
803 quoteOneName(attname,
804 RIAttName(fk_rel, riinfo->fk_attnums[i]));
805 sprintf(paramname, "$%d", i + 1);
806 ri_GenerateQual(&querybuf, querysep,
807 paramname, pk_type,
808 riinfo->pf_eq_oprs[i],
809 attname, fk_type);
810 if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
811 ri_GenerateQualCollation(&querybuf, pk_coll);
812 querysep = "AND";
813 queryoids[i] = pk_type;
814 }
815
816 /* Prepare and save the plan */
817 qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
818 &qkey, fk_rel, pk_rel);
819 }
820
821 /*
822 * We have a plan now. Build up the arguments from the key values in the
823 * deleted PK tuple and delete the referencing rows
824 */
825 ri_PerformCheck(riinfo, &qkey, qplan,
826 fk_rel, pk_rel,
827 oldslot, NULL,
828 true, /* must detect new rows */
829 SPI_OK_DELETE);
830
831 if (SPI_finish() != SPI_OK_FINISH)
832 elog(ERROR, "SPI_finish failed");
833
834 table_close(fk_rel, RowExclusiveLock);
835
836 return PointerGetDatum(NULL);
837 }
838
839
840 /*
841 * RI_FKey_cascade_upd -
842 *
843 * Cascaded update foreign key references at update event on PK table.
844 */
845 Datum
RI_FKey_cascade_upd(PG_FUNCTION_ARGS)846 RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
847 {
848 TriggerData *trigdata = (TriggerData *) fcinfo->context;
849 const RI_ConstraintInfo *riinfo;
850 Relation fk_rel;
851 Relation pk_rel;
852 TupleTableSlot *newslot;
853 TupleTableSlot *oldslot;
854 RI_QueryKey qkey;
855 SPIPlanPtr qplan;
856
857 /* Check that this is a valid trigger call on the right time and event. */
858 ri_CheckTrigger(fcinfo, "RI_FKey_cascade_upd", RI_TRIGTYPE_UPDATE);
859
860 riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
861 trigdata->tg_relation, true);
862
863 /*
864 * Get the relation descriptors of the FK and PK tables and the new and
865 * old tuple.
866 *
867 * fk_rel is opened in RowExclusiveLock mode since that's what our
868 * eventual UPDATE will get on it.
869 */
870 fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
871 pk_rel = trigdata->tg_relation;
872 newslot = trigdata->tg_newslot;
873 oldslot = trigdata->tg_trigslot;
874
875 if (SPI_connect() != SPI_OK_CONNECT)
876 elog(ERROR, "SPI_connect failed");
877
878 /* Fetch or prepare a saved plan for the cascaded update */
879 ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_UPD_DOUPDATE);
880
881 if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
882 {
883 StringInfoData querybuf;
884 StringInfoData qualbuf;
885 char fkrelname[MAX_QUOTED_REL_NAME_LEN];
886 char attname[MAX_QUOTED_NAME_LEN];
887 char paramname[16];
888 const char *querysep;
889 const char *qualsep;
890 Oid queryoids[RI_MAX_NUMKEYS * 2];
891 const char *fk_only;
892
893 /* ----------
894 * The query string built is
895 * UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
896 * WHERE $n = fkatt1 [AND ...]
897 * The type id's for the $ parameters are those of the
898 * corresponding PK attributes. Note that we are assuming
899 * there is an assignment cast from the PK to the FK type;
900 * else the parser will fail.
901 * ----------
902 */
903 initStringInfo(&querybuf);
904 initStringInfo(&qualbuf);
905 fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
906 "" : "ONLY ";
907 quoteRelationName(fkrelname, fk_rel);
908 appendStringInfo(&querybuf, "UPDATE %s%s SET",
909 fk_only, fkrelname);
910 querysep = "";
911 qualsep = "WHERE";
912 for (int i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
913 {
914 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
915 Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
916 Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
917 Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
918
919 quoteOneName(attname,
920 RIAttName(fk_rel, riinfo->fk_attnums[i]));
921 appendStringInfo(&querybuf,
922 "%s %s = $%d",
923 querysep, attname, i + 1);
924 sprintf(paramname, "$%d", j + 1);
925 ri_GenerateQual(&qualbuf, qualsep,
926 paramname, pk_type,
927 riinfo->pf_eq_oprs[i],
928 attname, fk_type);
929 if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
930 ri_GenerateQualCollation(&querybuf, pk_coll);
931 querysep = ",";
932 qualsep = "AND";
933 queryoids[i] = pk_type;
934 queryoids[j] = pk_type;
935 }
936 appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
937
938 /* Prepare and save the plan */
939 qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys * 2, queryoids,
940 &qkey, fk_rel, pk_rel);
941 }
942
943 /*
944 * We have a plan now. Run it to update the existing references.
945 */
946 ri_PerformCheck(riinfo, &qkey, qplan,
947 fk_rel, pk_rel,
948 oldslot, newslot,
949 true, /* must detect new rows */
950 SPI_OK_UPDATE);
951
952 if (SPI_finish() != SPI_OK_FINISH)
953 elog(ERROR, "SPI_finish failed");
954
955 table_close(fk_rel, RowExclusiveLock);
956
957 return PointerGetDatum(NULL);
958 }
959
960
961 /*
962 * RI_FKey_setnull_del -
963 *
964 * Set foreign key references to NULL values at delete event on PK table.
965 */
966 Datum
RI_FKey_setnull_del(PG_FUNCTION_ARGS)967 RI_FKey_setnull_del(PG_FUNCTION_ARGS)
968 {
969 /* Check that this is a valid trigger call on the right time and event. */
970 ri_CheckTrigger(fcinfo, "RI_FKey_setnull_del", RI_TRIGTYPE_DELETE);
971
972 /* Share code with UPDATE case */
973 return ri_set((TriggerData *) fcinfo->context, true);
974 }
975
976 /*
977 * RI_FKey_setnull_upd -
978 *
979 * Set foreign key references to NULL at update event on PK table.
980 */
981 Datum
RI_FKey_setnull_upd(PG_FUNCTION_ARGS)982 RI_FKey_setnull_upd(PG_FUNCTION_ARGS)
983 {
984 /* Check that this is a valid trigger call on the right time and event. */
985 ri_CheckTrigger(fcinfo, "RI_FKey_setnull_upd", RI_TRIGTYPE_UPDATE);
986
987 /* Share code with DELETE case */
988 return ri_set((TriggerData *) fcinfo->context, true);
989 }
990
991 /*
992 * RI_FKey_setdefault_del -
993 *
994 * Set foreign key references to defaults at delete event on PK table.
995 */
996 Datum
RI_FKey_setdefault_del(PG_FUNCTION_ARGS)997 RI_FKey_setdefault_del(PG_FUNCTION_ARGS)
998 {
999 /* Check that this is a valid trigger call on the right time and event. */
1000 ri_CheckTrigger(fcinfo, "RI_FKey_setdefault_del", RI_TRIGTYPE_DELETE);
1001
1002 /* Share code with UPDATE case */
1003 return ri_set((TriggerData *) fcinfo->context, false);
1004 }
1005
1006 /*
1007 * RI_FKey_setdefault_upd -
1008 *
1009 * Set foreign key references to defaults at update event on PK table.
1010 */
1011 Datum
RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)1012 RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)
1013 {
1014 /* Check that this is a valid trigger call on the right time and event. */
1015 ri_CheckTrigger(fcinfo, "RI_FKey_setdefault_upd", RI_TRIGTYPE_UPDATE);
1016
1017 /* Share code with DELETE case */
1018 return ri_set((TriggerData *) fcinfo->context, false);
1019 }
1020
1021 /*
1022 * ri_set -
1023 *
1024 * Common code for ON DELETE SET NULL, ON DELETE SET DEFAULT, ON UPDATE SET
1025 * NULL, and ON UPDATE SET DEFAULT.
1026 */
1027 static Datum
ri_set(TriggerData * trigdata,bool is_set_null)1028 ri_set(TriggerData *trigdata, bool is_set_null)
1029 {
1030 const RI_ConstraintInfo *riinfo;
1031 Relation fk_rel;
1032 Relation pk_rel;
1033 TupleTableSlot *oldslot;
1034 RI_QueryKey qkey;
1035 SPIPlanPtr qplan;
1036
1037 riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
1038 trigdata->tg_relation, true);
1039
1040 /*
1041 * Get the relation descriptors of the FK and PK tables and the old tuple.
1042 *
1043 * fk_rel is opened in RowExclusiveLock mode since that's what our
1044 * eventual UPDATE will get on it.
1045 */
1046 fk_rel = table_open(riinfo->fk_relid, RowExclusiveLock);
1047 pk_rel = trigdata->tg_relation;
1048 oldslot = trigdata->tg_trigslot;
1049
1050 if (SPI_connect() != SPI_OK_CONNECT)
1051 elog(ERROR, "SPI_connect failed");
1052
1053 /*
1054 * Fetch or prepare a saved plan for the set null/default operation (it's
1055 * the same query for delete and update cases)
1056 */
1057 ri_BuildQueryKey(&qkey, riinfo,
1058 (is_set_null
1059 ? RI_PLAN_SETNULL_DOUPDATE
1060 : RI_PLAN_SETDEFAULT_DOUPDATE));
1061
1062 if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
1063 {
1064 StringInfoData querybuf;
1065 StringInfoData qualbuf;
1066 char fkrelname[MAX_QUOTED_REL_NAME_LEN];
1067 char attname[MAX_QUOTED_NAME_LEN];
1068 char paramname[16];
1069 const char *querysep;
1070 const char *qualsep;
1071 Oid queryoids[RI_MAX_NUMKEYS];
1072 const char *fk_only;
1073
1074 /* ----------
1075 * The query string built is
1076 * UPDATE [ONLY] <fktable> SET fkatt1 = {NULL|DEFAULT} [, ...]
1077 * WHERE $1 = fkatt1 [AND ...]
1078 * The type id's for the $ parameters are those of the
1079 * corresponding PK attributes.
1080 * ----------
1081 */
1082 initStringInfo(&querybuf);
1083 initStringInfo(&qualbuf);
1084 fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1085 "" : "ONLY ";
1086 quoteRelationName(fkrelname, fk_rel);
1087 appendStringInfo(&querybuf, "UPDATE %s%s SET",
1088 fk_only, fkrelname);
1089 querysep = "";
1090 qualsep = "WHERE";
1091 for (int i = 0; i < riinfo->nkeys; i++)
1092 {
1093 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1094 Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1095 Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1096 Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1097
1098 quoteOneName(attname,
1099 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1100 appendStringInfo(&querybuf,
1101 "%s %s = %s",
1102 querysep, attname,
1103 is_set_null ? "NULL" : "DEFAULT");
1104 sprintf(paramname, "$%d", i + 1);
1105 ri_GenerateQual(&qualbuf, qualsep,
1106 paramname, pk_type,
1107 riinfo->pf_eq_oprs[i],
1108 attname, fk_type);
1109 if (pk_coll != fk_coll && !get_collation_isdeterministic(pk_coll))
1110 ri_GenerateQualCollation(&querybuf, pk_coll);
1111 querysep = ",";
1112 qualsep = "AND";
1113 queryoids[i] = pk_type;
1114 }
1115 appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
1116
1117 /* Prepare and save the plan */
1118 qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
1119 &qkey, fk_rel, pk_rel);
1120 }
1121
1122 /*
1123 * We have a plan now. Run it to update the existing references.
1124 */
1125 ri_PerformCheck(riinfo, &qkey, qplan,
1126 fk_rel, pk_rel,
1127 oldslot, NULL,
1128 true, /* must detect new rows */
1129 SPI_OK_UPDATE);
1130
1131 if (SPI_finish() != SPI_OK_FINISH)
1132 elog(ERROR, "SPI_finish failed");
1133
1134 table_close(fk_rel, RowExclusiveLock);
1135
1136 if (is_set_null)
1137 return PointerGetDatum(NULL);
1138 else
1139 {
1140 /*
1141 * If we just deleted or updated the PK row whose key was equal to the
1142 * FK columns' default values, and a referencing row exists in the FK
1143 * table, we would have updated that row to the same values it already
1144 * had --- and RI_FKey_fk_upd_check_required would hence believe no
1145 * check is necessary. So we need to do another lookup now and in
1146 * case a reference still exists, abort the operation. That is
1147 * already implemented in the NO ACTION trigger, so just run it. (This
1148 * recheck is only needed in the SET DEFAULT case, since CASCADE would
1149 * remove such rows in case of a DELETE operation or would change the
1150 * FK key values in case of an UPDATE, while SET NULL is certain to
1151 * result in rows that satisfy the FK constraint.)
1152 */
1153 return ri_restrict(trigdata, true);
1154 }
1155 }
1156
1157
1158 /*
1159 * RI_FKey_pk_upd_check_required -
1160 *
1161 * Check if we really need to fire the RI trigger for an update or delete to a PK
1162 * relation. This is called by the AFTER trigger queue manager to see if
1163 * it can skip queuing an instance of an RI trigger. Returns true if the
1164 * trigger must be fired, false if we can prove the constraint will still
1165 * be satisfied.
1166 *
1167 * newslot will be NULL if this is called for a delete.
1168 */
1169 bool
RI_FKey_pk_upd_check_required(Trigger * trigger,Relation pk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot)1170 RI_FKey_pk_upd_check_required(Trigger *trigger, Relation pk_rel,
1171 TupleTableSlot *oldslot, TupleTableSlot *newslot)
1172 {
1173 const RI_ConstraintInfo *riinfo;
1174
1175 riinfo = ri_FetchConstraintInfo(trigger, pk_rel, true);
1176
1177 /*
1178 * If any old key value is NULL, the row could not have been referenced by
1179 * an FK row, so no check is needed.
1180 */
1181 if (ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) != RI_KEYS_NONE_NULL)
1182 return false;
1183
1184 /* If all old and new key values are equal, no check is needed */
1185 if (newslot && ri_KeysEqual(pk_rel, oldslot, newslot, riinfo, true))
1186 return false;
1187
1188 /* Else we need to fire the trigger. */
1189 return true;
1190 }
1191
1192 /*
1193 * RI_FKey_fk_upd_check_required -
1194 *
1195 * Check if we really need to fire the RI trigger for an update to an FK
1196 * relation. This is called by the AFTER trigger queue manager to see if
1197 * it can skip queuing an instance of an RI trigger. Returns true if the
1198 * trigger must be fired, false if we can prove the constraint will still
1199 * be satisfied.
1200 */
1201 bool
RI_FKey_fk_upd_check_required(Trigger * trigger,Relation fk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot)1202 RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
1203 TupleTableSlot *oldslot, TupleTableSlot *newslot)
1204 {
1205 const RI_ConstraintInfo *riinfo;
1206 int ri_nullcheck;
1207 Datum xminDatum;
1208 TransactionId xmin;
1209 bool isnull;
1210
1211 riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1212
1213 ri_nullcheck = ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false);
1214
1215 /*
1216 * If all new key values are NULL, the row satisfies the constraint, so no
1217 * check is needed.
1218 */
1219 if (ri_nullcheck == RI_KEYS_ALL_NULL)
1220 return false;
1221
1222 /*
1223 * If some new key values are NULL, the behavior depends on the match
1224 * type.
1225 */
1226 else if (ri_nullcheck == RI_KEYS_SOME_NULL)
1227 {
1228 switch (riinfo->confmatchtype)
1229 {
1230 case FKCONSTR_MATCH_SIMPLE:
1231
1232 /*
1233 * If any new key value is NULL, the row must satisfy the
1234 * constraint, so no check is needed.
1235 */
1236 return false;
1237
1238 case FKCONSTR_MATCH_PARTIAL:
1239
1240 /*
1241 * Don't know, must run full check.
1242 */
1243 break;
1244
1245 case FKCONSTR_MATCH_FULL:
1246
1247 /*
1248 * If some new key values are NULL, the row fails the
1249 * constraint. We must not throw error here, because the row
1250 * might get invalidated before the constraint is to be
1251 * checked, but we should queue the event to apply the check
1252 * later.
1253 */
1254 return true;
1255 }
1256 }
1257
1258 /*
1259 * Continues here for no new key values are NULL, or we couldn't decide
1260 * yet.
1261 */
1262
1263 /*
1264 * If the original row was inserted by our own transaction, we must fire
1265 * the trigger whether or not the keys are equal. This is because our
1266 * UPDATE will invalidate the INSERT so that the INSERT RI trigger will
1267 * not do anything; so we had better do the UPDATE check. (We could skip
1268 * this if we knew the INSERT trigger already fired, but there is no easy
1269 * way to know that.)
1270 */
1271 xminDatum = slot_getsysattr(oldslot, MinTransactionIdAttributeNumber, &isnull);
1272 Assert(!isnull);
1273 xmin = DatumGetTransactionId(xminDatum);
1274 if (TransactionIdIsCurrentTransactionId(xmin))
1275 return true;
1276
1277 /* If all old and new key values are equal, no check is needed */
1278 if (ri_KeysEqual(fk_rel, oldslot, newslot, riinfo, false))
1279 return false;
1280
1281 /* Else we need to fire the trigger. */
1282 return true;
1283 }
1284
1285 /*
1286 * RI_Initial_Check -
1287 *
1288 * Check an entire table for non-matching values using a single query.
1289 * This is not a trigger procedure, but is called during ALTER TABLE
1290 * ADD FOREIGN KEY to validate the initial table contents.
1291 *
1292 * We expect that the caller has made provision to prevent any problems
1293 * caused by concurrent actions. This could be either by locking rel and
1294 * pkrel at ShareRowExclusiveLock or higher, or by otherwise ensuring
1295 * that triggers implementing the checks are already active.
1296 * Hence, we do not need to lock individual rows for the check.
1297 *
1298 * If the check fails because the current user doesn't have permissions
1299 * to read both tables, return false to let our caller know that they will
1300 * need to do something else to check the constraint.
1301 */
1302 bool
RI_Initial_Check(Trigger * trigger,Relation fk_rel,Relation pk_rel)1303 RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
1304 {
1305 const RI_ConstraintInfo *riinfo;
1306 StringInfoData querybuf;
1307 char pkrelname[MAX_QUOTED_REL_NAME_LEN];
1308 char fkrelname[MAX_QUOTED_REL_NAME_LEN];
1309 char pkattname[MAX_QUOTED_NAME_LEN + 3];
1310 char fkattname[MAX_QUOTED_NAME_LEN + 3];
1311 RangeTblEntry *pkrte;
1312 RangeTblEntry *fkrte;
1313 const char *sep;
1314 const char *fk_only;
1315 const char *pk_only;
1316 int save_nestlevel;
1317 char workmembuf[32];
1318 int spi_result;
1319 SPIPlanPtr qplan;
1320
1321 riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1322
1323 /*
1324 * Check to make sure current user has enough permissions to do the test
1325 * query. (If not, caller can fall back to the trigger method, which
1326 * works because it changes user IDs on the fly.)
1327 *
1328 * XXX are there any other show-stopper conditions to check?
1329 */
1330 pkrte = makeNode(RangeTblEntry);
1331 pkrte->rtekind = RTE_RELATION;
1332 pkrte->relid = RelationGetRelid(pk_rel);
1333 pkrte->relkind = pk_rel->rd_rel->relkind;
1334 pkrte->rellockmode = AccessShareLock;
1335 pkrte->requiredPerms = ACL_SELECT;
1336
1337 fkrte = makeNode(RangeTblEntry);
1338 fkrte->rtekind = RTE_RELATION;
1339 fkrte->relid = RelationGetRelid(fk_rel);
1340 fkrte->relkind = fk_rel->rd_rel->relkind;
1341 fkrte->rellockmode = AccessShareLock;
1342 fkrte->requiredPerms = ACL_SELECT;
1343
1344 for (int i = 0; i < riinfo->nkeys; i++)
1345 {
1346 int attno;
1347
1348 attno = riinfo->pk_attnums[i] - FirstLowInvalidHeapAttributeNumber;
1349 pkrte->selectedCols = bms_add_member(pkrte->selectedCols, attno);
1350
1351 attno = riinfo->fk_attnums[i] - FirstLowInvalidHeapAttributeNumber;
1352 fkrte->selectedCols = bms_add_member(fkrte->selectedCols, attno);
1353 }
1354
1355 if (!ExecCheckRTPerms(list_make2(fkrte, pkrte), false))
1356 return false;
1357
1358 /*
1359 * Also punt if RLS is enabled on either table unless this role has the
1360 * bypassrls right or is the table owner of the table(s) involved which
1361 * have RLS enabled.
1362 */
1363 if (!has_bypassrls_privilege(GetUserId()) &&
1364 ((pk_rel->rd_rel->relrowsecurity &&
1365 !pg_class_ownercheck(pkrte->relid, GetUserId())) ||
1366 (fk_rel->rd_rel->relrowsecurity &&
1367 !pg_class_ownercheck(fkrte->relid, GetUserId()))))
1368 return false;
1369
1370 /*----------
1371 * The query string built is:
1372 * SELECT fk.keycols FROM [ONLY] relname fk
1373 * LEFT OUTER JOIN [ONLY] pkrelname pk
1374 * ON (pk.pkkeycol1=fk.keycol1 [AND ...])
1375 * WHERE pk.pkkeycol1 IS NULL AND
1376 * For MATCH SIMPLE:
1377 * (fk.keycol1 IS NOT NULL [AND ...])
1378 * For MATCH FULL:
1379 * (fk.keycol1 IS NOT NULL [OR ...])
1380 *
1381 * We attach COLLATE clauses to the operators when comparing columns
1382 * that have different collations.
1383 *----------
1384 */
1385 initStringInfo(&querybuf);
1386 appendStringInfoString(&querybuf, "SELECT ");
1387 sep = "";
1388 for (int i = 0; i < riinfo->nkeys; i++)
1389 {
1390 quoteOneName(fkattname,
1391 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1392 appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
1393 sep = ", ";
1394 }
1395
1396 quoteRelationName(pkrelname, pk_rel);
1397 quoteRelationName(fkrelname, fk_rel);
1398 fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1399 "" : "ONLY ";
1400 pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1401 "" : "ONLY ";
1402 appendStringInfo(&querybuf,
1403 " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
1404 fk_only, fkrelname, pk_only, pkrelname);
1405
1406 strcpy(pkattname, "pk.");
1407 strcpy(fkattname, "fk.");
1408 sep = "(";
1409 for (int i = 0; i < riinfo->nkeys; i++)
1410 {
1411 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1412 Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1413 Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1414 Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1415
1416 quoteOneName(pkattname + 3,
1417 RIAttName(pk_rel, riinfo->pk_attnums[i]));
1418 quoteOneName(fkattname + 3,
1419 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1420 ri_GenerateQual(&querybuf, sep,
1421 pkattname, pk_type,
1422 riinfo->pf_eq_oprs[i],
1423 fkattname, fk_type);
1424 if (pk_coll != fk_coll)
1425 ri_GenerateQualCollation(&querybuf, pk_coll);
1426 sep = "AND";
1427 }
1428
1429 /*
1430 * It's sufficient to test any one pk attribute for null to detect a join
1431 * failure.
1432 */
1433 quoteOneName(pkattname, RIAttName(pk_rel, riinfo->pk_attnums[0]));
1434 appendStringInfo(&querybuf, ") WHERE pk.%s IS NULL AND (", pkattname);
1435
1436 sep = "";
1437 for (int i = 0; i < riinfo->nkeys; i++)
1438 {
1439 quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
1440 appendStringInfo(&querybuf,
1441 "%sfk.%s IS NOT NULL",
1442 sep, fkattname);
1443 switch (riinfo->confmatchtype)
1444 {
1445 case FKCONSTR_MATCH_SIMPLE:
1446 sep = " AND ";
1447 break;
1448 case FKCONSTR_MATCH_FULL:
1449 sep = " OR ";
1450 break;
1451 }
1452 }
1453 appendStringInfoChar(&querybuf, ')');
1454
1455 /*
1456 * Temporarily increase work_mem so that the check query can be executed
1457 * more efficiently. It seems okay to do this because the query is simple
1458 * enough to not use a multiple of work_mem, and one typically would not
1459 * have many large foreign-key validations happening concurrently. So
1460 * this seems to meet the criteria for being considered a "maintenance"
1461 * operation, and accordingly we use maintenance_work_mem. However, we
1462 * must also set hash_mem_multiplier to 1, since it is surely not okay to
1463 * let that get applied to the maintenance_work_mem value.
1464 *
1465 * We use the equivalent of a function SET option to allow the setting to
1466 * persist for exactly the duration of the check query. guc.c also takes
1467 * care of undoing the setting on error.
1468 */
1469 save_nestlevel = NewGUCNestLevel();
1470
1471 snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
1472 (void) set_config_option("work_mem", workmembuf,
1473 PGC_USERSET, PGC_S_SESSION,
1474 GUC_ACTION_SAVE, true, 0, false);
1475 (void) set_config_option("hash_mem_multiplier", "1",
1476 PGC_USERSET, PGC_S_SESSION,
1477 GUC_ACTION_SAVE, true, 0, false);
1478
1479 if (SPI_connect() != SPI_OK_CONNECT)
1480 elog(ERROR, "SPI_connect failed");
1481
1482 /*
1483 * Generate the plan. We don't need to cache it, and there are no
1484 * arguments to the plan.
1485 */
1486 qplan = SPI_prepare(querybuf.data, 0, NULL);
1487
1488 if (qplan == NULL)
1489 elog(ERROR, "SPI_prepare returned %s for %s",
1490 SPI_result_code_string(SPI_result), querybuf.data);
1491
1492 /*
1493 * Run the plan. For safety we force a current snapshot to be used. (In
1494 * transaction-snapshot mode, this arguably violates transaction isolation
1495 * rules, but we really haven't got much choice.) We don't need to
1496 * register the snapshot, because SPI_execute_snapshot will see to it. We
1497 * need at most one tuple returned, so pass limit = 1.
1498 */
1499 spi_result = SPI_execute_snapshot(qplan,
1500 NULL, NULL,
1501 GetLatestSnapshot(),
1502 InvalidSnapshot,
1503 true, false, 1);
1504
1505 /* Check result */
1506 if (spi_result != SPI_OK_SELECT)
1507 elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
1508
1509 /* Did we find a tuple violating the constraint? */
1510 if (SPI_processed > 0)
1511 {
1512 TupleTableSlot *slot;
1513 HeapTuple tuple = SPI_tuptable->vals[0];
1514 TupleDesc tupdesc = SPI_tuptable->tupdesc;
1515 RI_ConstraintInfo fake_riinfo;
1516
1517 slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
1518
1519 heap_deform_tuple(tuple, tupdesc,
1520 slot->tts_values, slot->tts_isnull);
1521 ExecStoreVirtualTuple(slot);
1522
1523 /*
1524 * The columns to look at in the result tuple are 1..N, not whatever
1525 * they are in the fk_rel. Hack up riinfo so that the subroutines
1526 * called here will behave properly.
1527 *
1528 * In addition to this, we have to pass the correct tupdesc to
1529 * ri_ReportViolation, overriding its normal habit of using the pk_rel
1530 * or fk_rel's tupdesc.
1531 */
1532 memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
1533 for (int i = 0; i < fake_riinfo.nkeys; i++)
1534 fake_riinfo.fk_attnums[i] = i + 1;
1535
1536 /*
1537 * If it's MATCH FULL, and there are any nulls in the FK keys,
1538 * complain about that rather than the lack of a match. MATCH FULL
1539 * disallows partially-null FK rows.
1540 */
1541 if (fake_riinfo.confmatchtype == FKCONSTR_MATCH_FULL &&
1542 ri_NullCheck(tupdesc, slot, &fake_riinfo, false) != RI_KEYS_NONE_NULL)
1543 ereport(ERROR,
1544 (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
1545 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
1546 RelationGetRelationName(fk_rel),
1547 NameStr(fake_riinfo.conname)),
1548 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
1549 errtableconstraint(fk_rel,
1550 NameStr(fake_riinfo.conname))));
1551
1552 /*
1553 * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
1554 * query, which isn't true, but will cause it to use
1555 * fake_riinfo.fk_attnums as we need.
1556 */
1557 ri_ReportViolation(&fake_riinfo,
1558 pk_rel, fk_rel,
1559 slot, tupdesc,
1560 RI_PLAN_CHECK_LOOKUPPK, false);
1561
1562 ExecDropSingleTupleTableSlot(slot);
1563 }
1564
1565 if (SPI_finish() != SPI_OK_FINISH)
1566 elog(ERROR, "SPI_finish failed");
1567
1568 /*
1569 * Restore work_mem and hash_mem_multiplier.
1570 */
1571 AtEOXact_GUC(true, save_nestlevel);
1572
1573 return true;
1574 }
1575
1576 /*
1577 * RI_PartitionRemove_Check -
1578 *
1579 * Verify no referencing values exist, when a partition is detached on
1580 * the referenced side of a foreign key constraint.
1581 */
1582 void
RI_PartitionRemove_Check(Trigger * trigger,Relation fk_rel,Relation pk_rel)1583 RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
1584 {
1585 const RI_ConstraintInfo *riinfo;
1586 StringInfoData querybuf;
1587 char *constraintDef;
1588 char pkrelname[MAX_QUOTED_REL_NAME_LEN];
1589 char fkrelname[MAX_QUOTED_REL_NAME_LEN];
1590 char pkattname[MAX_QUOTED_NAME_LEN + 3];
1591 char fkattname[MAX_QUOTED_NAME_LEN + 3];
1592 const char *sep;
1593 const char *fk_only;
1594 int save_nestlevel;
1595 char workmembuf[32];
1596 int spi_result;
1597 SPIPlanPtr qplan;
1598 int i;
1599
1600 riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
1601
1602 /*
1603 * We don't check permissions before displaying the error message, on the
1604 * assumption that the user detaching the partition must have enough
1605 * privileges to examine the table contents anyhow.
1606 */
1607
1608 /*----------
1609 * The query string built is:
1610 * SELECT fk.keycols FROM [ONLY] relname fk
1611 * JOIN pkrelname pk
1612 * ON (pk.pkkeycol1=fk.keycol1 [AND ...])
1613 * WHERE (<partition constraint>) AND
1614 * For MATCH SIMPLE:
1615 * (fk.keycol1 IS NOT NULL [AND ...])
1616 * For MATCH FULL:
1617 * (fk.keycol1 IS NOT NULL [OR ...])
1618 *
1619 * We attach COLLATE clauses to the operators when comparing columns
1620 * that have different collations.
1621 *----------
1622 */
1623 initStringInfo(&querybuf);
1624 appendStringInfoString(&querybuf, "SELECT ");
1625 sep = "";
1626 for (i = 0; i < riinfo->nkeys; i++)
1627 {
1628 quoteOneName(fkattname,
1629 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1630 appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
1631 sep = ", ";
1632 }
1633
1634 quoteRelationName(pkrelname, pk_rel);
1635 quoteRelationName(fkrelname, fk_rel);
1636 fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
1637 "" : "ONLY ";
1638 appendStringInfo(&querybuf,
1639 " FROM %s%s fk JOIN %s pk ON",
1640 fk_only, fkrelname, pkrelname);
1641 strcpy(pkattname, "pk.");
1642 strcpy(fkattname, "fk.");
1643 sep = "(";
1644 for (i = 0; i < riinfo->nkeys; i++)
1645 {
1646 Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
1647 Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
1648 Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
1649 Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
1650
1651 quoteOneName(pkattname + 3,
1652 RIAttName(pk_rel, riinfo->pk_attnums[i]));
1653 quoteOneName(fkattname + 3,
1654 RIAttName(fk_rel, riinfo->fk_attnums[i]));
1655 ri_GenerateQual(&querybuf, sep,
1656 pkattname, pk_type,
1657 riinfo->pf_eq_oprs[i],
1658 fkattname, fk_type);
1659 if (pk_coll != fk_coll)
1660 ri_GenerateQualCollation(&querybuf, pk_coll);
1661 sep = "AND";
1662 }
1663
1664 /*
1665 * Start the WHERE clause with the partition constraint (except if this is
1666 * the default partition and there's no other partition, because the
1667 * partition constraint is the empty string in that case.)
1668 */
1669 constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
1670 if (constraintDef && constraintDef[0] != '\0')
1671 appendStringInfo(&querybuf, ") WHERE %s AND (",
1672 constraintDef);
1673 else
1674 appendStringInfoString(&querybuf, ") WHERE (");
1675
1676 sep = "";
1677 for (i = 0; i < riinfo->nkeys; i++)
1678 {
1679 quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
1680 appendStringInfo(&querybuf,
1681 "%sfk.%s IS NOT NULL",
1682 sep, fkattname);
1683 switch (riinfo->confmatchtype)
1684 {
1685 case FKCONSTR_MATCH_SIMPLE:
1686 sep = " AND ";
1687 break;
1688 case FKCONSTR_MATCH_FULL:
1689 sep = " OR ";
1690 break;
1691 }
1692 }
1693 appendStringInfoChar(&querybuf, ')');
1694
1695 /*
1696 * Temporarily increase work_mem so that the check query can be executed
1697 * more efficiently. It seems okay to do this because the query is simple
1698 * enough to not use a multiple of work_mem, and one typically would not
1699 * have many large foreign-key validations happening concurrently. So
1700 * this seems to meet the criteria for being considered a "maintenance"
1701 * operation, and accordingly we use maintenance_work_mem. However, we
1702 * must also set hash_mem_multiplier to 1, since it is surely not okay to
1703 * let that get applied to the maintenance_work_mem value.
1704 *
1705 * We use the equivalent of a function SET option to allow the setting to
1706 * persist for exactly the duration of the check query. guc.c also takes
1707 * care of undoing the setting on error.
1708 */
1709 save_nestlevel = NewGUCNestLevel();
1710
1711 snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
1712 (void) set_config_option("work_mem", workmembuf,
1713 PGC_USERSET, PGC_S_SESSION,
1714 GUC_ACTION_SAVE, true, 0, false);
1715 (void) set_config_option("hash_mem_multiplier", "1",
1716 PGC_USERSET, PGC_S_SESSION,
1717 GUC_ACTION_SAVE, true, 0, false);
1718
1719 if (SPI_connect() != SPI_OK_CONNECT)
1720 elog(ERROR, "SPI_connect failed");
1721
1722 /*
1723 * Generate the plan. We don't need to cache it, and there are no
1724 * arguments to the plan.
1725 */
1726 qplan = SPI_prepare(querybuf.data, 0, NULL);
1727
1728 if (qplan == NULL)
1729 elog(ERROR, "SPI_prepare returned %s for %s",
1730 SPI_result_code_string(SPI_result), querybuf.data);
1731
1732 /*
1733 * Run the plan. For safety we force a current snapshot to be used. (In
1734 * transaction-snapshot mode, this arguably violates transaction isolation
1735 * rules, but we really haven't got much choice.) We don't need to
1736 * register the snapshot, because SPI_execute_snapshot will see to it. We
1737 * need at most one tuple returned, so pass limit = 1.
1738 */
1739 spi_result = SPI_execute_snapshot(qplan,
1740 NULL, NULL,
1741 GetLatestSnapshot(),
1742 InvalidSnapshot,
1743 true, false, 1);
1744
1745 /* Check result */
1746 if (spi_result != SPI_OK_SELECT)
1747 elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
1748
1749 /* Did we find a tuple that would violate the constraint? */
1750 if (SPI_processed > 0)
1751 {
1752 TupleTableSlot *slot;
1753 HeapTuple tuple = SPI_tuptable->vals[0];
1754 TupleDesc tupdesc = SPI_tuptable->tupdesc;
1755 RI_ConstraintInfo fake_riinfo;
1756
1757 slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
1758
1759 heap_deform_tuple(tuple, tupdesc,
1760 slot->tts_values, slot->tts_isnull);
1761 ExecStoreVirtualTuple(slot);
1762
1763 /*
1764 * The columns to look at in the result tuple are 1..N, not whatever
1765 * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation
1766 * will behave properly.
1767 *
1768 * In addition to this, we have to pass the correct tupdesc to
1769 * ri_ReportViolation, overriding its normal habit of using the pk_rel
1770 * or fk_rel's tupdesc.
1771 */
1772 memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
1773 for (i = 0; i < fake_riinfo.nkeys; i++)
1774 fake_riinfo.pk_attnums[i] = i + 1;
1775
1776 ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
1777 slot, tupdesc, 0, true);
1778 }
1779
1780 if (SPI_finish() != SPI_OK_FINISH)
1781 elog(ERROR, "SPI_finish failed");
1782
1783 /*
1784 * Restore work_mem and hash_mem_multiplier.
1785 */
1786 AtEOXact_GUC(true, save_nestlevel);
1787 }
1788
1789
1790 /* ----------
1791 * Local functions below
1792 * ----------
1793 */
1794
1795
1796 /*
1797 * quoteOneName --- safely quote a single SQL name
1798 *
1799 * buffer must be MAX_QUOTED_NAME_LEN long (includes room for \0)
1800 */
1801 static void
quoteOneName(char * buffer,const char * name)1802 quoteOneName(char *buffer, const char *name)
1803 {
1804 /* Rather than trying to be smart, just always quote it. */
1805 *buffer++ = '"';
1806 while (*name)
1807 {
1808 if (*name == '"')
1809 *buffer++ = '"';
1810 *buffer++ = *name++;
1811 }
1812 *buffer++ = '"';
1813 *buffer = '\0';
1814 }
1815
1816 /*
1817 * quoteRelationName --- safely quote a fully qualified relation name
1818 *
1819 * buffer must be MAX_QUOTED_REL_NAME_LEN long (includes room for \0)
1820 */
1821 static void
quoteRelationName(char * buffer,Relation rel)1822 quoteRelationName(char *buffer, Relation rel)
1823 {
1824 quoteOneName(buffer, get_namespace_name(RelationGetNamespace(rel)));
1825 buffer += strlen(buffer);
1826 *buffer++ = '.';
1827 quoteOneName(buffer, RelationGetRelationName(rel));
1828 }
1829
1830 /*
1831 * ri_GenerateQual --- generate a WHERE clause equating two variables
1832 *
1833 * This basically appends " sep leftop op rightop" to buf, adding casts
1834 * and schema qualification as needed to ensure that the parser will select
1835 * the operator we specify. leftop and rightop should be parenthesized
1836 * if they aren't variables or parameters.
1837 */
1838 static void
ri_GenerateQual(StringInfo buf,const char * sep,const char * leftop,Oid leftoptype,Oid opoid,const char * rightop,Oid rightoptype)1839 ri_GenerateQual(StringInfo buf,
1840 const char *sep,
1841 const char *leftop, Oid leftoptype,
1842 Oid opoid,
1843 const char *rightop, Oid rightoptype)
1844 {
1845 appendStringInfo(buf, " %s ", sep);
1846 generate_operator_clause(buf, leftop, leftoptype, opoid,
1847 rightop, rightoptype);
1848 }
1849
1850 /*
1851 * ri_GenerateQualCollation --- add a COLLATE spec to a WHERE clause
1852 *
1853 * At present, we intentionally do not use this function for RI queries that
1854 * compare a variable to a $n parameter. Since parameter symbols always have
1855 * default collation, the effect will be to use the variable's collation.
1856 * Now that is only strictly correct when testing the referenced column, since
1857 * the SQL standard specifies that RI comparisons should use the referenced
1858 * column's collation. However, so long as all collations have the same
1859 * notion of equality (which they do, because texteq reduces to bitwise
1860 * equality), there's no visible semantic impact from using the referencing
1861 * column's collation when testing it, and this is a good thing to do because
1862 * it lets us use a normal index on the referencing column. However, we do
1863 * have to use this function when directly comparing the referencing and
1864 * referenced columns, if they are of different collations; else the parser
1865 * will fail to resolve the collation to use.
1866 */
1867 static void
ri_GenerateQualCollation(StringInfo buf,Oid collation)1868 ri_GenerateQualCollation(StringInfo buf, Oid collation)
1869 {
1870 HeapTuple tp;
1871 Form_pg_collation colltup;
1872 char *collname;
1873 char onename[MAX_QUOTED_NAME_LEN];
1874
1875 /* Nothing to do if it's a noncollatable data type */
1876 if (!OidIsValid(collation))
1877 return;
1878
1879 tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(collation));
1880 if (!HeapTupleIsValid(tp))
1881 elog(ERROR, "cache lookup failed for collation %u", collation);
1882 colltup = (Form_pg_collation) GETSTRUCT(tp);
1883 collname = NameStr(colltup->collname);
1884
1885 /*
1886 * We qualify the name always, for simplicity and to ensure the query is
1887 * not search-path-dependent.
1888 */
1889 quoteOneName(onename, get_namespace_name(colltup->collnamespace));
1890 appendStringInfo(buf, " COLLATE %s", onename);
1891 quoteOneName(onename, collname);
1892 appendStringInfo(buf, ".%s", onename);
1893
1894 ReleaseSysCache(tp);
1895 }
1896
1897 /* ----------
1898 * ri_BuildQueryKey -
1899 *
1900 * Construct a hashtable key for a prepared SPI plan of an FK constraint.
1901 *
1902 * key: output argument, *key is filled in based on the other arguments
1903 * riinfo: info derived from pg_constraint entry
1904 * constr_queryno: an internal number identifying the query type
1905 * (see RI_PLAN_XXX constants at head of file)
1906 * ----------
1907 */
1908 static void
ri_BuildQueryKey(RI_QueryKey * key,const RI_ConstraintInfo * riinfo,int32 constr_queryno)1909 ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
1910 int32 constr_queryno)
1911 {
1912 /*
1913 * Inherited constraints with a common ancestor can share ri_query_cache
1914 * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
1915 * Except in that case, the query processes the other table involved in
1916 * the FK constraint (i.e., not the table on which the trigger has been
1917 * fired), and so it will be the same for all members of the inheritance
1918 * tree. So we may use the root constraint's OID in the hash key, rather
1919 * than the constraint's own OID. This avoids creating duplicate SPI
1920 * plans, saving lots of work and memory when there are many partitions
1921 * with similar FK constraints.
1922 *
1923 * (Note that we must still have a separate RI_ConstraintInfo for each
1924 * constraint, because partitions can have different column orders,
1925 * resulting in different pk_attnums[] or fk_attnums[] array contents.)
1926 *
1927 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
1928 * to use memset to clear them.
1929 */
1930 if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
1931 key->constr_id = riinfo->constraint_root_id;
1932 else
1933 key->constr_id = riinfo->constraint_id;
1934 key->constr_queryno = constr_queryno;
1935 }
1936
1937 /*
1938 * Check that RI trigger function was called in expected context
1939 */
1940 static void
ri_CheckTrigger(FunctionCallInfo fcinfo,const char * funcname,int tgkind)1941 ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname, int tgkind)
1942 {
1943 TriggerData *trigdata = (TriggerData *) fcinfo->context;
1944
1945 if (!CALLED_AS_TRIGGER(fcinfo))
1946 ereport(ERROR,
1947 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1948 errmsg("function \"%s\" was not called by trigger manager", funcname)));
1949
1950 /*
1951 * Check proper event
1952 */
1953 if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) ||
1954 !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
1955 ereport(ERROR,
1956 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1957 errmsg("function \"%s\" must be fired AFTER ROW", funcname)));
1958
1959 switch (tgkind)
1960 {
1961 case RI_TRIGTYPE_INSERT:
1962 if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
1963 ereport(ERROR,
1964 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1965 errmsg("function \"%s\" must be fired for INSERT", funcname)));
1966 break;
1967 case RI_TRIGTYPE_UPDATE:
1968 if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
1969 ereport(ERROR,
1970 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1971 errmsg("function \"%s\" must be fired for UPDATE", funcname)));
1972 break;
1973 case RI_TRIGTYPE_DELETE:
1974 if (!TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
1975 ereport(ERROR,
1976 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
1977 errmsg("function \"%s\" must be fired for DELETE", funcname)));
1978 break;
1979 }
1980 }
1981
1982
1983 /*
1984 * Fetch the RI_ConstraintInfo struct for the trigger's FK constraint.
1985 */
1986 static const RI_ConstraintInfo *
ri_FetchConstraintInfo(Trigger * trigger,Relation trig_rel,bool rel_is_pk)1987 ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
1988 {
1989 Oid constraintOid = trigger->tgconstraint;
1990 const RI_ConstraintInfo *riinfo;
1991
1992 /*
1993 * Check that the FK constraint's OID is available; it might not be if
1994 * we've been invoked via an ordinary trigger or an old-style "constraint
1995 * trigger".
1996 */
1997 if (!OidIsValid(constraintOid))
1998 ereport(ERROR,
1999 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2000 errmsg("no pg_constraint entry for trigger \"%s\" on table \"%s\"",
2001 trigger->tgname, RelationGetRelationName(trig_rel)),
2002 errhint("Remove this referential integrity trigger and its mates, then do ALTER TABLE ADD CONSTRAINT.")));
2003
2004 /* Find or create a hashtable entry for the constraint */
2005 riinfo = ri_LoadConstraintInfo(constraintOid);
2006
2007 /* Do some easy cross-checks against the trigger call data */
2008 if (rel_is_pk)
2009 {
2010 if (riinfo->fk_relid != trigger->tgconstrrelid ||
2011 riinfo->pk_relid != RelationGetRelid(trig_rel))
2012 elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
2013 trigger->tgname, RelationGetRelationName(trig_rel));
2014 }
2015 else
2016 {
2017 if (riinfo->fk_relid != RelationGetRelid(trig_rel) ||
2018 riinfo->pk_relid != trigger->tgconstrrelid)
2019 elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
2020 trigger->tgname, RelationGetRelationName(trig_rel));
2021 }
2022
2023 if (riinfo->confmatchtype != FKCONSTR_MATCH_FULL &&
2024 riinfo->confmatchtype != FKCONSTR_MATCH_PARTIAL &&
2025 riinfo->confmatchtype != FKCONSTR_MATCH_SIMPLE)
2026 elog(ERROR, "unrecognized confmatchtype: %d",
2027 riinfo->confmatchtype);
2028
2029 if (riinfo->confmatchtype == FKCONSTR_MATCH_PARTIAL)
2030 ereport(ERROR,
2031 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2032 errmsg("MATCH PARTIAL not yet implemented")));
2033
2034 return riinfo;
2035 }
2036
2037 /*
2038 * Fetch or create the RI_ConstraintInfo struct for an FK constraint.
2039 */
2040 static const RI_ConstraintInfo *
ri_LoadConstraintInfo(Oid constraintOid)2041 ri_LoadConstraintInfo(Oid constraintOid)
2042 {
2043 RI_ConstraintInfo *riinfo;
2044 bool found;
2045 HeapTuple tup;
2046 Form_pg_constraint conForm;
2047
2048 /*
2049 * On the first call initialize the hashtable
2050 */
2051 if (!ri_constraint_cache)
2052 ri_InitHashTables();
2053
2054 /*
2055 * Find or create a hash entry. If we find a valid one, just return it.
2056 */
2057 riinfo = (RI_ConstraintInfo *) hash_search(ri_constraint_cache,
2058 (void *) &constraintOid,
2059 HASH_ENTER, &found);
2060 if (!found)
2061 riinfo->valid = false;
2062 else if (riinfo->valid)
2063 return riinfo;
2064
2065 /*
2066 * Fetch the pg_constraint row so we can fill in the entry.
2067 */
2068 tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintOid));
2069 if (!HeapTupleIsValid(tup)) /* should not happen */
2070 elog(ERROR, "cache lookup failed for constraint %u", constraintOid);
2071 conForm = (Form_pg_constraint) GETSTRUCT(tup);
2072
2073 if (conForm->contype != CONSTRAINT_FOREIGN) /* should not happen */
2074 elog(ERROR, "constraint %u is not a foreign key constraint",
2075 constraintOid);
2076
2077 /* And extract data */
2078 Assert(riinfo->constraint_id == constraintOid);
2079 if (OidIsValid(conForm->conparentid))
2080 riinfo->constraint_root_id =
2081 get_ri_constraint_root(conForm->conparentid);
2082 else
2083 riinfo->constraint_root_id = constraintOid;
2084 riinfo->oidHashValue = GetSysCacheHashValue1(CONSTROID,
2085 ObjectIdGetDatum(constraintOid));
2086 riinfo->rootHashValue = GetSysCacheHashValue1(CONSTROID,
2087 ObjectIdGetDatum(riinfo->constraint_root_id));
2088 memcpy(&riinfo->conname, &conForm->conname, sizeof(NameData));
2089 riinfo->pk_relid = conForm->confrelid;
2090 riinfo->fk_relid = conForm->conrelid;
2091 riinfo->confupdtype = conForm->confupdtype;
2092 riinfo->confdeltype = conForm->confdeltype;
2093 riinfo->confmatchtype = conForm->confmatchtype;
2094
2095 DeconstructFkConstraintRow(tup,
2096 &riinfo->nkeys,
2097 riinfo->fk_attnums,
2098 riinfo->pk_attnums,
2099 riinfo->pf_eq_oprs,
2100 riinfo->pp_eq_oprs,
2101 riinfo->ff_eq_oprs);
2102
2103 ReleaseSysCache(tup);
2104
2105 /*
2106 * For efficient processing of invalidation messages below, we keep a
2107 * doubly-linked list, and a count, of all currently valid entries.
2108 */
2109 dlist_push_tail(&ri_constraint_cache_valid_list, &riinfo->valid_link);
2110 ri_constraint_cache_valid_count++;
2111
2112 riinfo->valid = true;
2113
2114 return riinfo;
2115 }
2116
2117 /*
2118 * get_ri_constraint_root
2119 * Returns the OID of the constraint's root parent
2120 */
2121 static Oid
get_ri_constraint_root(Oid constrOid)2122 get_ri_constraint_root(Oid constrOid)
2123 {
2124 for (;;)
2125 {
2126 HeapTuple tuple;
2127 Oid constrParentOid;
2128
2129 tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
2130 if (!HeapTupleIsValid(tuple))
2131 elog(ERROR, "cache lookup failed for constraint %u", constrOid);
2132 constrParentOid = ((Form_pg_constraint) GETSTRUCT(tuple))->conparentid;
2133 ReleaseSysCache(tuple);
2134 if (!OidIsValid(constrParentOid))
2135 break; /* we reached the root constraint */
2136 constrOid = constrParentOid;
2137 }
2138 return constrOid;
2139 }
2140
2141 /*
2142 * Callback for pg_constraint inval events
2143 *
2144 * While most syscache callbacks just flush all their entries, pg_constraint
2145 * gets enough update traffic that it's probably worth being smarter.
2146 * Invalidate any ri_constraint_cache entry associated with the syscache
2147 * entry with the specified hash value, or all entries if hashvalue == 0.
2148 *
2149 * Note: at the time a cache invalidation message is processed there may be
2150 * active references to the cache. Because of this we never remove entries
2151 * from the cache, but only mark them invalid, which is harmless to active
2152 * uses. (Any query using an entry should hold a lock sufficient to keep that
2153 * data from changing under it --- but we may get cache flushes anyway.)
2154 */
2155 static void
InvalidateConstraintCacheCallBack(Datum arg,int cacheid,uint32 hashvalue)2156 InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
2157 {
2158 dlist_mutable_iter iter;
2159
2160 Assert(ri_constraint_cache != NULL);
2161
2162 /*
2163 * If the list of currently valid entries gets excessively large, we mark
2164 * them all invalid so we can empty the list. This arrangement avoids
2165 * O(N^2) behavior in situations where a session touches many foreign keys
2166 * and also does many ALTER TABLEs, such as a restore from pg_dump.
2167 */
2168 if (ri_constraint_cache_valid_count > 1000)
2169 hashvalue = 0; /* pretend it's a cache reset */
2170
2171 dlist_foreach_modify(iter, &ri_constraint_cache_valid_list)
2172 {
2173 RI_ConstraintInfo *riinfo = dlist_container(RI_ConstraintInfo,
2174 valid_link, iter.cur);
2175
2176 /*
2177 * We must invalidate not only entries directly matching the given
2178 * hash value, but also child entries, in case the invalidation
2179 * affects a root constraint.
2180 */
2181 if (hashvalue == 0 ||
2182 riinfo->oidHashValue == hashvalue ||
2183 riinfo->rootHashValue == hashvalue)
2184 {
2185 riinfo->valid = false;
2186 /* Remove invalidated entries from the list, too */
2187 dlist_delete(iter.cur);
2188 ri_constraint_cache_valid_count--;
2189 }
2190 }
2191 }
2192
2193
2194 /*
2195 * Prepare execution plan for a query to enforce an RI restriction
2196 */
2197 static SPIPlanPtr
ri_PlanCheck(const char * querystr,int nargs,Oid * argtypes,RI_QueryKey * qkey,Relation fk_rel,Relation pk_rel)2198 ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
2199 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
2200 {
2201 SPIPlanPtr qplan;
2202 Relation query_rel;
2203 Oid save_userid;
2204 int save_sec_context;
2205
2206 /*
2207 * Use the query type code to determine whether the query is run against
2208 * the PK or FK table; we'll do the check as that table's owner
2209 */
2210 if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
2211 query_rel = pk_rel;
2212 else
2213 query_rel = fk_rel;
2214
2215 /* Switch to proper UID to perform check as */
2216 GetUserIdAndSecContext(&save_userid, &save_sec_context);
2217 SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
2218 save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
2219 SECURITY_NOFORCE_RLS);
2220
2221 /* Create the plan */
2222 qplan = SPI_prepare(querystr, nargs, argtypes);
2223
2224 if (qplan == NULL)
2225 elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), querystr);
2226
2227 /* Restore UID and security context */
2228 SetUserIdAndSecContext(save_userid, save_sec_context);
2229
2230 /* Save the plan */
2231 SPI_keepplan(qplan);
2232 ri_HashPreparedPlan(qkey, qplan);
2233
2234 return qplan;
2235 }
2236
2237 /*
2238 * Perform a query to enforce an RI restriction
2239 */
2240 static bool
ri_PerformCheck(const RI_ConstraintInfo * riinfo,RI_QueryKey * qkey,SPIPlanPtr qplan,Relation fk_rel,Relation pk_rel,TupleTableSlot * oldslot,TupleTableSlot * newslot,bool detectNewRows,int expect_OK)2241 ri_PerformCheck(const RI_ConstraintInfo *riinfo,
2242 RI_QueryKey *qkey, SPIPlanPtr qplan,
2243 Relation fk_rel, Relation pk_rel,
2244 TupleTableSlot *oldslot, TupleTableSlot *newslot,
2245 bool detectNewRows, int expect_OK)
2246 {
2247 Relation query_rel,
2248 source_rel;
2249 bool source_is_pk;
2250 Snapshot test_snapshot;
2251 Snapshot crosscheck_snapshot;
2252 int limit;
2253 int spi_result;
2254 Oid save_userid;
2255 int save_sec_context;
2256 Datum vals[RI_MAX_NUMKEYS * 2];
2257 char nulls[RI_MAX_NUMKEYS * 2];
2258
2259 /*
2260 * Use the query type code to determine whether the query is run against
2261 * the PK or FK table; we'll do the check as that table's owner
2262 */
2263 if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
2264 query_rel = pk_rel;
2265 else
2266 query_rel = fk_rel;
2267
2268 /*
2269 * The values for the query are taken from the table on which the trigger
2270 * is called - it is normally the other one with respect to query_rel. An
2271 * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
2272 * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK). We might eventually
2273 * need some less klugy way to determine this.
2274 */
2275 if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
2276 {
2277 source_rel = fk_rel;
2278 source_is_pk = false;
2279 }
2280 else
2281 {
2282 source_rel = pk_rel;
2283 source_is_pk = true;
2284 }
2285
2286 /* Extract the parameters to be passed into the query */
2287 if (newslot)
2288 {
2289 ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
2290 vals, nulls);
2291 if (oldslot)
2292 ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
2293 vals + riinfo->nkeys, nulls + riinfo->nkeys);
2294 }
2295 else
2296 {
2297 ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
2298 vals, nulls);
2299 }
2300
2301 /*
2302 * In READ COMMITTED mode, we just need to use an up-to-date regular
2303 * snapshot, and we will see all rows that could be interesting. But in
2304 * transaction-snapshot mode, we can't change the transaction snapshot. If
2305 * the caller passes detectNewRows == false then it's okay to do the query
2306 * with the transaction snapshot; otherwise we use a current snapshot, and
2307 * tell the executor to error out if it finds any rows under the current
2308 * snapshot that wouldn't be visible per the transaction snapshot. Note
2309 * that SPI_execute_snapshot will register the snapshots, so we don't need
2310 * to bother here.
2311 */
2312 if (IsolationUsesXactSnapshot() && detectNewRows)
2313 {
2314 CommandCounterIncrement(); /* be sure all my own work is visible */
2315 test_snapshot = GetLatestSnapshot();
2316 crosscheck_snapshot = GetTransactionSnapshot();
2317 }
2318 else
2319 {
2320 /* the default SPI behavior is okay */
2321 test_snapshot = InvalidSnapshot;
2322 crosscheck_snapshot = InvalidSnapshot;
2323 }
2324
2325 /*
2326 * If this is a select query (e.g., for a 'no action' or 'restrict'
2327 * trigger), we only need to see if there is a single row in the table,
2328 * matching the key. Otherwise, limit = 0 - because we want the query to
2329 * affect ALL the matching rows.
2330 */
2331 limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
2332
2333 /* Switch to proper UID to perform check as */
2334 GetUserIdAndSecContext(&save_userid, &save_sec_context);
2335 SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
2336 save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
2337 SECURITY_NOFORCE_RLS);
2338
2339 /* Finally we can run the query. */
2340 spi_result = SPI_execute_snapshot(qplan,
2341 vals, nulls,
2342 test_snapshot, crosscheck_snapshot,
2343 false, false, limit);
2344
2345 /* Restore UID and security context */
2346 SetUserIdAndSecContext(save_userid, save_sec_context);
2347
2348 /* Check result */
2349 if (spi_result < 0)
2350 elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
2351
2352 if (expect_OK >= 0 && spi_result != expect_OK)
2353 ereport(ERROR,
2354 (errcode(ERRCODE_INTERNAL_ERROR),
2355 errmsg("referential integrity query on \"%s\" from constraint \"%s\" on \"%s\" gave unexpected result",
2356 RelationGetRelationName(pk_rel),
2357 NameStr(riinfo->conname),
2358 RelationGetRelationName(fk_rel)),
2359 errhint("This is most likely due to a rule having rewritten the query.")));
2360
2361 /* XXX wouldn't it be clearer to do this part at the caller? */
2362 if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
2363 expect_OK == SPI_OK_SELECT &&
2364 (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
2365 ri_ReportViolation(riinfo,
2366 pk_rel, fk_rel,
2367 newslot ? newslot : oldslot,
2368 NULL,
2369 qkey->constr_queryno, false);
2370
2371 return SPI_processed != 0;
2372 }
2373
2374 /*
2375 * Extract fields from a tuple into Datum/nulls arrays
2376 */
2377 static void
ri_ExtractValues(Relation rel,TupleTableSlot * slot,const RI_ConstraintInfo * riinfo,bool rel_is_pk,Datum * vals,char * nulls)2378 ri_ExtractValues(Relation rel, TupleTableSlot *slot,
2379 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
2380 Datum *vals, char *nulls)
2381 {
2382 const int16 *attnums;
2383 bool isnull;
2384
2385 if (rel_is_pk)
2386 attnums = riinfo->pk_attnums;
2387 else
2388 attnums = riinfo->fk_attnums;
2389
2390 for (int i = 0; i < riinfo->nkeys; i++)
2391 {
2392 vals[i] = slot_getattr(slot, attnums[i], &isnull);
2393 nulls[i] = isnull ? 'n' : ' ';
2394 }
2395 }
2396
2397 /*
2398 * Produce an error report
2399 *
2400 * If the failed constraint was on insert/update to the FK table,
2401 * we want the key names and values extracted from there, and the error
2402 * message to look like 'key blah is not present in PK'.
2403 * Otherwise, the attr names and values come from the PK table and the
2404 * message looks like 'key blah is still referenced from FK'.
2405 */
2406 static void
ri_ReportViolation(const RI_ConstraintInfo * riinfo,Relation pk_rel,Relation fk_rel,TupleTableSlot * violatorslot,TupleDesc tupdesc,int queryno,bool partgone)2407 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
2408 Relation pk_rel, Relation fk_rel,
2409 TupleTableSlot *violatorslot, TupleDesc tupdesc,
2410 int queryno, bool partgone)
2411 {
2412 StringInfoData key_names;
2413 StringInfoData key_values;
2414 bool onfk;
2415 const int16 *attnums;
2416 Oid rel_oid;
2417 AclResult aclresult;
2418 bool has_perm = true;
2419
2420 /*
2421 * Determine which relation to complain about. If tupdesc wasn't passed
2422 * by caller, assume the violator tuple came from there.
2423 */
2424 onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
2425 if (onfk)
2426 {
2427 attnums = riinfo->fk_attnums;
2428 rel_oid = fk_rel->rd_id;
2429 if (tupdesc == NULL)
2430 tupdesc = fk_rel->rd_att;
2431 }
2432 else
2433 {
2434 attnums = riinfo->pk_attnums;
2435 rel_oid = pk_rel->rd_id;
2436 if (tupdesc == NULL)
2437 tupdesc = pk_rel->rd_att;
2438 }
2439
2440 /*
2441 * Check permissions- if the user does not have access to view the data in
2442 * any of the key columns then we don't include the errdetail() below.
2443 *
2444 * Check if RLS is enabled on the relation first. If so, we don't return
2445 * any specifics to avoid leaking data.
2446 *
2447 * Check table-level permissions next and, failing that, column-level
2448 * privileges.
2449 *
2450 * When a partition at the referenced side is being detached/dropped, we
2451 * needn't check, since the user must be the table owner anyway.
2452 */
2453 if (partgone)
2454 has_perm = true;
2455 else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
2456 {
2457 aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
2458 if (aclresult != ACLCHECK_OK)
2459 {
2460 /* Try for column-level permissions */
2461 for (int idx = 0; idx < riinfo->nkeys; idx++)
2462 {
2463 aclresult = pg_attribute_aclcheck(rel_oid, attnums[idx],
2464 GetUserId(),
2465 ACL_SELECT);
2466
2467 /* No access to the key */
2468 if (aclresult != ACLCHECK_OK)
2469 {
2470 has_perm = false;
2471 break;
2472 }
2473 }
2474 }
2475 }
2476 else
2477 has_perm = false;
2478
2479 if (has_perm)
2480 {
2481 /* Get printable versions of the keys involved */
2482 initStringInfo(&key_names);
2483 initStringInfo(&key_values);
2484 for (int idx = 0; idx < riinfo->nkeys; idx++)
2485 {
2486 int fnum = attnums[idx];
2487 Form_pg_attribute att = TupleDescAttr(tupdesc, fnum - 1);
2488 char *name,
2489 *val;
2490 Datum datum;
2491 bool isnull;
2492
2493 name = NameStr(att->attname);
2494
2495 datum = slot_getattr(violatorslot, fnum, &isnull);
2496 if (!isnull)
2497 {
2498 Oid foutoid;
2499 bool typisvarlena;
2500
2501 getTypeOutputInfo(att->atttypid, &foutoid, &typisvarlena);
2502 val = OidOutputFunctionCall(foutoid, datum);
2503 }
2504 else
2505 val = "null";
2506
2507 if (idx > 0)
2508 {
2509 appendStringInfoString(&key_names, ", ");
2510 appendStringInfoString(&key_values, ", ");
2511 }
2512 appendStringInfoString(&key_names, name);
2513 appendStringInfoString(&key_values, val);
2514 }
2515 }
2516
2517 if (partgone)
2518 ereport(ERROR,
2519 (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2520 errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
2521 RelationGetRelationName(pk_rel),
2522 NameStr(riinfo->conname)),
2523 errdetail("Key (%s)=(%s) is still referenced from table \"%s\".",
2524 key_names.data, key_values.data,
2525 RelationGetRelationName(fk_rel)),
2526 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2527 else if (onfk)
2528 ereport(ERROR,
2529 (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2530 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
2531 RelationGetRelationName(fk_rel),
2532 NameStr(riinfo->conname)),
2533 has_perm ?
2534 errdetail("Key (%s)=(%s) is not present in table \"%s\".",
2535 key_names.data, key_values.data,
2536 RelationGetRelationName(pk_rel)) :
2537 errdetail("Key is not present in table \"%s\".",
2538 RelationGetRelationName(pk_rel)),
2539 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2540 else
2541 ereport(ERROR,
2542 (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
2543 errmsg("update or delete on table \"%s\" violates foreign key constraint \"%s\" on table \"%s\"",
2544 RelationGetRelationName(pk_rel),
2545 NameStr(riinfo->conname),
2546 RelationGetRelationName(fk_rel)),
2547 has_perm ?
2548 errdetail("Key (%s)=(%s) is still referenced from table \"%s\".",
2549 key_names.data, key_values.data,
2550 RelationGetRelationName(fk_rel)) :
2551 errdetail("Key is still referenced from table \"%s\".",
2552 RelationGetRelationName(fk_rel)),
2553 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
2554 }
2555
2556
2557 /*
2558 * ri_NullCheck -
2559 *
2560 * Determine the NULL state of all key values in a tuple
2561 *
2562 * Returns one of RI_KEYS_ALL_NULL, RI_KEYS_NONE_NULL or RI_KEYS_SOME_NULL.
2563 */
2564 static int
ri_NullCheck(TupleDesc tupDesc,TupleTableSlot * slot,const RI_ConstraintInfo * riinfo,bool rel_is_pk)2565 ri_NullCheck(TupleDesc tupDesc,
2566 TupleTableSlot *slot,
2567 const RI_ConstraintInfo *riinfo, bool rel_is_pk)
2568 {
2569 const int16 *attnums;
2570 bool allnull = true;
2571 bool nonenull = true;
2572
2573 if (rel_is_pk)
2574 attnums = riinfo->pk_attnums;
2575 else
2576 attnums = riinfo->fk_attnums;
2577
2578 for (int i = 0; i < riinfo->nkeys; i++)
2579 {
2580 if (slot_attisnull(slot, attnums[i]))
2581 nonenull = false;
2582 else
2583 allnull = false;
2584 }
2585
2586 if (allnull)
2587 return RI_KEYS_ALL_NULL;
2588
2589 if (nonenull)
2590 return RI_KEYS_NONE_NULL;
2591
2592 return RI_KEYS_SOME_NULL;
2593 }
2594
2595
2596 /*
2597 * ri_InitHashTables -
2598 *
2599 * Initialize our internal hash tables.
2600 */
2601 static void
ri_InitHashTables(void)2602 ri_InitHashTables(void)
2603 {
2604 HASHCTL ctl;
2605
2606 ctl.keysize = sizeof(Oid);
2607 ctl.entrysize = sizeof(RI_ConstraintInfo);
2608 ri_constraint_cache = hash_create("RI constraint cache",
2609 RI_INIT_CONSTRAINTHASHSIZE,
2610 &ctl, HASH_ELEM | HASH_BLOBS);
2611
2612 /* Arrange to flush cache on pg_constraint changes */
2613 CacheRegisterSyscacheCallback(CONSTROID,
2614 InvalidateConstraintCacheCallBack,
2615 (Datum) 0);
2616
2617 ctl.keysize = sizeof(RI_QueryKey);
2618 ctl.entrysize = sizeof(RI_QueryHashEntry);
2619 ri_query_cache = hash_create("RI query cache",
2620 RI_INIT_QUERYHASHSIZE,
2621 &ctl, HASH_ELEM | HASH_BLOBS);
2622
2623 ctl.keysize = sizeof(RI_CompareKey);
2624 ctl.entrysize = sizeof(RI_CompareHashEntry);
2625 ri_compare_cache = hash_create("RI compare cache",
2626 RI_INIT_QUERYHASHSIZE,
2627 &ctl, HASH_ELEM | HASH_BLOBS);
2628 }
2629
2630
2631 /*
2632 * ri_FetchPreparedPlan -
2633 *
2634 * Lookup for a query key in our private hash table of prepared
2635 * and saved SPI execution plans. Return the plan if found or NULL.
2636 */
2637 static SPIPlanPtr
ri_FetchPreparedPlan(RI_QueryKey * key)2638 ri_FetchPreparedPlan(RI_QueryKey *key)
2639 {
2640 RI_QueryHashEntry *entry;
2641 SPIPlanPtr plan;
2642
2643 /*
2644 * On the first call initialize the hashtable
2645 */
2646 if (!ri_query_cache)
2647 ri_InitHashTables();
2648
2649 /*
2650 * Lookup for the key
2651 */
2652 entry = (RI_QueryHashEntry *) hash_search(ri_query_cache,
2653 (void *) key,
2654 HASH_FIND, NULL);
2655 if (entry == NULL)
2656 return NULL;
2657
2658 /*
2659 * Check whether the plan is still valid. If it isn't, we don't want to
2660 * simply rely on plancache.c to regenerate it; rather we should start
2661 * from scratch and rebuild the query text too. This is to cover cases
2662 * such as table/column renames. We depend on the plancache machinery to
2663 * detect possible invalidations, though.
2664 *
2665 * CAUTION: this check is only trustworthy if the caller has already
2666 * locked both FK and PK rels.
2667 */
2668 plan = entry->plan;
2669 if (plan && SPI_plan_is_valid(plan))
2670 return plan;
2671
2672 /*
2673 * Otherwise we might as well flush the cached plan now, to free a little
2674 * memory space before we make a new one.
2675 */
2676 entry->plan = NULL;
2677 if (plan)
2678 SPI_freeplan(plan);
2679
2680 return NULL;
2681 }
2682
2683
2684 /*
2685 * ri_HashPreparedPlan -
2686 *
2687 * Add another plan to our private SPI query plan hashtable.
2688 */
2689 static void
ri_HashPreparedPlan(RI_QueryKey * key,SPIPlanPtr plan)2690 ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan)
2691 {
2692 RI_QueryHashEntry *entry;
2693 bool found;
2694
2695 /*
2696 * On the first call initialize the hashtable
2697 */
2698 if (!ri_query_cache)
2699 ri_InitHashTables();
2700
2701 /*
2702 * Add the new plan. We might be overwriting an entry previously found
2703 * invalid by ri_FetchPreparedPlan.
2704 */
2705 entry = (RI_QueryHashEntry *) hash_search(ri_query_cache,
2706 (void *) key,
2707 HASH_ENTER, &found);
2708 Assert(!found || entry->plan == NULL);
2709 entry->plan = plan;
2710 }
2711
2712
2713 /*
2714 * ri_KeysEqual -
2715 *
2716 * Check if all key values in OLD and NEW are equal.
2717 *
2718 * Note: at some point we might wish to redefine this as checking for
2719 * "IS NOT DISTINCT" rather than "=", that is, allow two nulls to be
2720 * considered equal. Currently there is no need since all callers have
2721 * previously found at least one of the rows to contain no nulls.
2722 */
2723 static bool
ri_KeysEqual(Relation rel,TupleTableSlot * oldslot,TupleTableSlot * newslot,const RI_ConstraintInfo * riinfo,bool rel_is_pk)2724 ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot,
2725 const RI_ConstraintInfo *riinfo, bool rel_is_pk)
2726 {
2727 const int16 *attnums;
2728
2729 if (rel_is_pk)
2730 attnums = riinfo->pk_attnums;
2731 else
2732 attnums = riinfo->fk_attnums;
2733
2734 /* XXX: could be worthwhile to fetch all necessary attrs at once */
2735 for (int i = 0; i < riinfo->nkeys; i++)
2736 {
2737 Datum oldvalue;
2738 Datum newvalue;
2739 bool isnull;
2740
2741 /*
2742 * Get one attribute's oldvalue. If it is NULL - they're not equal.
2743 */
2744 oldvalue = slot_getattr(oldslot, attnums[i], &isnull);
2745 if (isnull)
2746 return false;
2747
2748 /*
2749 * Get one attribute's newvalue. If it is NULL - they're not equal.
2750 */
2751 newvalue = slot_getattr(newslot, attnums[i], &isnull);
2752 if (isnull)
2753 return false;
2754
2755 if (rel_is_pk)
2756 {
2757 /*
2758 * If we are looking at the PK table, then do a bytewise
2759 * comparison. We must propagate PK changes if the value is
2760 * changed to one that "looks" different but would compare as
2761 * equal using the equality operator. This only makes a
2762 * difference for ON UPDATE CASCADE, but for consistency we treat
2763 * all changes to the PK the same.
2764 */
2765 Form_pg_attribute att = TupleDescAttr(oldslot->tts_tupleDescriptor, attnums[i] - 1);
2766
2767 if (!datum_image_eq(oldvalue, newvalue, att->attbyval, att->attlen))
2768 return false;
2769 }
2770 else
2771 {
2772 /*
2773 * For the FK table, compare with the appropriate equality
2774 * operator. Changes that compare equal will still satisfy the
2775 * constraint after the update.
2776 */
2777 if (!ri_AttributesEqual(riinfo->ff_eq_oprs[i], RIAttType(rel, attnums[i]),
2778 oldvalue, newvalue))
2779 return false;
2780 }
2781 }
2782
2783 return true;
2784 }
2785
2786
2787 /*
2788 * ri_AttributesEqual -
2789 *
2790 * Call the appropriate equality comparison operator for two values.
2791 *
2792 * NB: we have already checked that neither value is null.
2793 */
2794 static bool
ri_AttributesEqual(Oid eq_opr,Oid typeid,Datum oldvalue,Datum newvalue)2795 ri_AttributesEqual(Oid eq_opr, Oid typeid,
2796 Datum oldvalue, Datum newvalue)
2797 {
2798 RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
2799
2800 /* Do we need to cast the values? */
2801 if (OidIsValid(entry->cast_func_finfo.fn_oid))
2802 {
2803 oldvalue = FunctionCall3(&entry->cast_func_finfo,
2804 oldvalue,
2805 Int32GetDatum(-1), /* typmod */
2806 BoolGetDatum(false)); /* implicit coercion */
2807 newvalue = FunctionCall3(&entry->cast_func_finfo,
2808 newvalue,
2809 Int32GetDatum(-1), /* typmod */
2810 BoolGetDatum(false)); /* implicit coercion */
2811 }
2812
2813 /*
2814 * Apply the comparison operator.
2815 *
2816 * Note: This function is part of a call stack that determines whether an
2817 * update to a row is significant enough that it needs checking or action
2818 * on the other side of a foreign-key constraint. Therefore, the
2819 * comparison here would need to be done with the collation of the *other*
2820 * table. For simplicity (e.g., we might not even have the other table
2821 * open), we'll just use the default collation here, which could lead to
2822 * some false negatives. All this would break if we ever allow
2823 * database-wide collations to be nondeterministic.
2824 */
2825 return DatumGetBool(FunctionCall2Coll(&entry->eq_opr_finfo,
2826 DEFAULT_COLLATION_OID,
2827 oldvalue, newvalue));
2828 }
2829
2830 /*
2831 * ri_HashCompareOp -
2832 *
2833 * See if we know how to compare two values, and create a new hash entry
2834 * if not.
2835 */
2836 static RI_CompareHashEntry *
ri_HashCompareOp(Oid eq_opr,Oid typeid)2837 ri_HashCompareOp(Oid eq_opr, Oid typeid)
2838 {
2839 RI_CompareKey key;
2840 RI_CompareHashEntry *entry;
2841 bool found;
2842
2843 /*
2844 * On the first call initialize the hashtable
2845 */
2846 if (!ri_compare_cache)
2847 ri_InitHashTables();
2848
2849 /*
2850 * Find or create a hash entry. Note we're assuming RI_CompareKey
2851 * contains no struct padding.
2852 */
2853 key.eq_opr = eq_opr;
2854 key.typeid = typeid;
2855 entry = (RI_CompareHashEntry *) hash_search(ri_compare_cache,
2856 (void *) &key,
2857 HASH_ENTER, &found);
2858 if (!found)
2859 entry->valid = false;
2860
2861 /*
2862 * If not already initialized, do so. Since we'll keep this hash entry
2863 * for the life of the backend, put any subsidiary info for the function
2864 * cache structs into TopMemoryContext.
2865 */
2866 if (!entry->valid)
2867 {
2868 Oid lefttype,
2869 righttype,
2870 castfunc;
2871 CoercionPathType pathtype;
2872
2873 /* We always need to know how to call the equality operator */
2874 fmgr_info_cxt(get_opcode(eq_opr), &entry->eq_opr_finfo,
2875 TopMemoryContext);
2876
2877 /*
2878 * If we chose to use a cast from FK to PK type, we may have to apply
2879 * the cast function to get to the operator's input type.
2880 *
2881 * XXX eventually it would be good to support array-coercion cases
2882 * here and in ri_AttributesEqual(). At the moment there is no point
2883 * because cases involving nonidentical array types will be rejected
2884 * at constraint creation time.
2885 *
2886 * XXX perhaps also consider supporting CoerceViaIO? No need at the
2887 * moment since that will never be generated for implicit coercions.
2888 */
2889 op_input_types(eq_opr, &lefttype, &righttype);
2890 Assert(lefttype == righttype);
2891 if (typeid == lefttype)
2892 castfunc = InvalidOid; /* simplest case */
2893 else
2894 {
2895 pathtype = find_coercion_pathway(lefttype, typeid,
2896 COERCION_IMPLICIT,
2897 &castfunc);
2898 if (pathtype != COERCION_PATH_FUNC &&
2899 pathtype != COERCION_PATH_RELABELTYPE)
2900 {
2901 /*
2902 * The declared input type of the eq_opr might be a
2903 * polymorphic type such as ANYARRAY or ANYENUM, or other
2904 * special cases such as RECORD; find_coercion_pathway
2905 * currently doesn't subsume these special cases.
2906 */
2907 if (!IsBinaryCoercible(typeid, lefttype))
2908 elog(ERROR, "no conversion function from %s to %s",
2909 format_type_be(typeid),
2910 format_type_be(lefttype));
2911 }
2912 }
2913 if (OidIsValid(castfunc))
2914 fmgr_info_cxt(castfunc, &entry->cast_func_finfo,
2915 TopMemoryContext);
2916 else
2917 entry->cast_func_finfo.fn_oid = InvalidOid;
2918 entry->valid = true;
2919 }
2920
2921 return entry;
2922 }
2923
2924
2925 /*
2926 * Given a trigger function OID, determine whether it is an RI trigger,
2927 * and if so whether it is attached to PK or FK relation.
2928 */
2929 int
RI_FKey_trigger_type(Oid tgfoid)2930 RI_FKey_trigger_type(Oid tgfoid)
2931 {
2932 switch (tgfoid)
2933 {
2934 case F_RI_FKEY_CASCADE_DEL:
2935 case F_RI_FKEY_CASCADE_UPD:
2936 case F_RI_FKEY_RESTRICT_DEL:
2937 case F_RI_FKEY_RESTRICT_UPD:
2938 case F_RI_FKEY_SETNULL_DEL:
2939 case F_RI_FKEY_SETNULL_UPD:
2940 case F_RI_FKEY_SETDEFAULT_DEL:
2941 case F_RI_FKEY_SETDEFAULT_UPD:
2942 case F_RI_FKEY_NOACTION_DEL:
2943 case F_RI_FKEY_NOACTION_UPD:
2944 return RI_TRIGGER_PK;
2945
2946 case F_RI_FKEY_CHECK_INS:
2947 case F_RI_FKEY_CHECK_UPD:
2948 return RI_TRIGGER_FK;
2949 }
2950
2951 return RI_TRIGGER_NONE;
2952 }
2953