1 /*-------------------------------------------------------------------------
2 *
3 * foreign_constraint.c
4 *
5 * This file contains functions to create, alter and drop foreign
6 * constraints on distributed tables.
7 *
8 * Copyright (c) Citus Data, Inc.
9 *
10 *-------------------------------------------------------------------------
11 */
12
13 #include "postgres.h"
14
15 #include "distributed/pg_version_constants.h"
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/xact.h"
20 #include "catalog/namespace.h"
21 #include "catalog/pg_constraint.h"
22 #include "access/genam.h"
23 #include "catalog/pg_type.h"
24 #include "distributed/colocation_utils.h"
25 #include "distributed/commands.h"
26 #include "distributed/coordinator_protocol.h"
27 #include "distributed/listutils.h"
28 #include "distributed/coordinator_protocol.h"
29 #include "distributed/multi_join_order.h"
30 #include "distributed/namespace_utils.h"
31 #include "distributed/reference_table_utils.h"
32 #include "distributed/version_compat.h"
33 #include "utils/builtins.h"
34 #include "utils/fmgroids.h"
35 #include "utils/inval.h"
36 #include "utils/lsyscache.h"
37 #include "utils/rel.h"
38 #include "utils/relcache.h"
39 #include "utils/ruleutils.h"
40 #include "utils/syscache.h"
41
42
43 #define BehaviorIsRestrictOrNoAction(x) \
44 ((x) == FKCONSTR_ACTION_NOACTION || (x) == FKCONSTR_ACTION_RESTRICT)
45
46
47 #define USE_CREATE_REFERENCE_TABLE_HINT \
48 "You could use SELECT create_reference_table('%s') " \
49 "to replicate the referenced table to all nodes or " \
50 "consider dropping the foreign key"
51
52
53 typedef bool (*CheckRelationFunc)(Oid);
54
55
56 /* Local functions forward declarations */
57 static void EnsureReferencingTableNotReplicated(Oid referencingTableId);
58 static void EnsureSupportedFKeyOnDistKey(Form_pg_constraint constraintForm);
59 static void EnsureSupportedFKeyBetweenCitusLocalAndRefTable(Form_pg_constraint
60 constraintForm,
61 char
62 referencingReplicationModel,
63 char
64 referencedReplicationModel,
65 Oid referencedTableId);
66 static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple,
67 Oid relationId,
68 int pgConstraintKey,
69 char *columnName);
70 static Oid FindForeignKeyOidWithName(List *foreignKeyOids, const
71 char *inputConstraintName);
72 static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
73 Var *referencingDistColumn,
74 Var *referencedDistColumn,
75 int *referencingAttrIndex,
76 int *referencedAttrIndex);
77 static List * GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
78 int searchForeignKeyColumnFlags);
79 static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple);
80 static List * GetForeignKeysWithLocalTables(Oid relationId);
81 static bool IsTableTypeIncluded(Oid relationId, int flags);
82 static void UpdateConstraintIsValid(Oid constraintId, bool isValid);
83
84
85 /*
86 * ConstraintIsAForeignKeyToReferenceTable checks if the given constraint is a
87 * foreign key constraint from the given relation to any reference table.
88 */
89 bool
ConstraintIsAForeignKeyToReferenceTable(char * inputConstaintName,Oid relationId)90 ConstraintIsAForeignKeyToReferenceTable(char *inputConstaintName, Oid relationId)
91 {
92 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
93 List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
94
95 Oid foreignKeyOid = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);
96
97 return OidIsValid(foreignKeyOid);
98 }
99
100
101 /*
102 * ErrorIfUnsupportedForeignConstraintExists runs checks related to foreign
103 * constraints and errors out if it is not possible to create one of the
104 * foreign constraint in distributed environment.
105 *
106 * To support foreign constraints, we require that;
107 * - If referencing and referenced tables are hash-distributed
108 * - Referencing and referenced tables are co-located.
109 * - Foreign constraint is defined over distribution column.
110 * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
111 * ON UPDATE CASCADE options
112 * are not used.
113 * - Replication factors of referencing and referenced table are 1.
114 * - If referenced table is a reference table
115 * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
116 * ON UPDATE CASCADE options are not used on the distribution key
117 * of the referencing column.
118 * - If referencing table is a reference table, error out if the referenced
119 * table is a distributed table.
120 * - If referencing table is a reference table and referenced table is a
121 * citus local table:
122 * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
123 * ON CASCADE options are not used.
124 * - If referencing or referenced table is distributed table, then the
125 * other table is not a citus local table.
126 */
127 void
ErrorIfUnsupportedForeignConstraintExists(Relation relation,char referencingDistMethod,char referencingReplicationModel,Var * referencingDistKey,uint32 referencingColocationId)128 ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDistMethod,
129 char referencingReplicationModel,
130 Var *referencingDistKey,
131 uint32 referencingColocationId)
132 {
133 Oid referencingTableId = relation->rd_id;
134
135 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
136 List *foreignKeyOids = GetForeignKeyOids(referencingTableId, flags);
137
138 Oid foreignKeyOid = InvalidOid;
139 foreach_oid(foreignKeyOid, foreignKeyOids)
140 {
141 HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyOid));
142
143 Assert(HeapTupleIsValid(heapTuple));
144
145 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
146
147 int referencingAttrIndex = -1;
148
149 Var *referencedDistKey = NULL;
150 int referencedAttrIndex = -1;
151 uint32 referencedColocationId = INVALID_COLOCATION_ID;
152
153 Oid referencedTableId = constraintForm->confrelid;
154 bool referencedIsCitus = IsCitusTable(referencedTableId);
155
156 bool selfReferencingTable = (referencingTableId == referencedTableId);
157
158 if (!referencedIsCitus && !selfReferencingTable)
159 {
160 if (IsCitusLocalTableByDistParams(referencingDistMethod,
161 referencingReplicationModel))
162 {
163 ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(referencedTableId);
164 }
165
166 char *referencedTableName = get_rel_name(referencedTableId);
167
168 ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
169 errmsg("referenced table \"%s\" must be a distributed table"
170 " or a reference table",
171 referencedTableName),
172 errdetail("To enforce foreign keys, the referencing and "
173 "referenced rows need to be stored on the same "
174 "node."),
175 errhint(USE_CREATE_REFERENCE_TABLE_HINT,
176 referencedTableName)));
177 }
178
179 /* set referenced table related variables here if table is referencing itself */
180 char referencedDistMethod = 0;
181 char referencedReplicationModel = REPLICATION_MODEL_INVALID;
182 if (!selfReferencingTable)
183 {
184 referencedDistMethod = PartitionMethod(referencedTableId);
185 referencedDistKey = IsCitusTableType(referencedTableId,
186 CITUS_TABLE_WITH_NO_DIST_KEY) ?
187 NULL :
188 DistPartitionKey(referencedTableId);
189 referencedColocationId = TableColocationId(referencedTableId);
190 referencedReplicationModel = TableReplicationModel(referencedTableId);
191 }
192 else
193 {
194 referencedDistMethod = referencingDistMethod;
195 referencedDistKey = referencingDistKey;
196 referencedColocationId = referencingColocationId;
197 referencedReplicationModel = referencingReplicationModel;
198 }
199
200 bool referencingIsCitusLocalOrRefTable =
201 (referencingDistMethod == DISTRIBUTE_BY_NONE);
202 bool referencedIsCitusLocalOrRefTable =
203 (referencedDistMethod == DISTRIBUTE_BY_NONE);
204 if (referencingIsCitusLocalOrRefTable && referencedIsCitusLocalOrRefTable)
205 {
206 EnsureSupportedFKeyBetweenCitusLocalAndRefTable(constraintForm,
207 referencingReplicationModel,
208 referencedReplicationModel,
209 referencedTableId);
210
211 ReleaseSysCache(heapTuple);
212 continue;
213 }
214
215 /*
216 * Foreign keys from citus local tables or reference tables to distributed
217 * tables are not supported.
218 */
219 if (referencingIsCitusLocalOrRefTable && !referencedIsCitusLocalOrRefTable)
220 {
221 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
222 errmsg("cannot create foreign key constraint "
223 "since foreign keys from reference tables "
224 "and local tables to distributed tables "
225 "are not supported"),
226 errdetail("Reference tables and local tables "
227 "can only have foreign keys to reference "
228 "tables and local tables")));
229 }
230
231 /*
232 * To enforce foreign constraints, tables must be co-located unless a
233 * reference table is referenced.
234 */
235 bool referencedIsReferenceTable =
236 (referencedReplicationModel == REPLICATION_MODEL_2PC);
237 if (!referencedIsReferenceTable && (
238 referencingColocationId == INVALID_COLOCATION_ID ||
239 referencingColocationId != referencedColocationId))
240 {
241 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
242 errmsg("cannot create foreign key constraint since "
243 "relations are not colocated or not referencing "
244 "a reference table"),
245 errdetail(
246 "A distributed table can only have foreign keys "
247 "if it is referencing another colocated hash "
248 "distributed table or a reference table")));
249 }
250
251 ForeignConstraintFindDistKeys(heapTuple,
252 referencingDistKey,
253 referencedDistKey,
254 &referencingAttrIndex,
255 &referencedAttrIndex);
256 bool referencingColumnsIncludeDistKey = (referencingAttrIndex != -1);
257 bool foreignConstraintOnDistKey =
258 (referencingColumnsIncludeDistKey && referencingAttrIndex ==
259 referencedAttrIndex);
260
261 /*
262 * If columns in the foreign key includes the distribution key from the
263 * referencing side, we do not allow update/delete operations through
264 * foreign key constraints (e.g. ... ON UPDATE SET NULL)
265 */
266 if (referencingColumnsIncludeDistKey)
267 {
268 EnsureSupportedFKeyOnDistKey(constraintForm);
269 }
270
271 /*
272 * if tables are hash-distributed and colocated, we need to make sure that
273 * the distribution key is included in foreign constraint.
274 */
275 if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
276 {
277 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
278 errmsg("cannot create foreign key constraint"),
279 errdetail("Foreign keys are supported in two cases, "
280 "either in between two colocated tables including "
281 "partition column in the same ordinal in the both "
282 "tables or from distributed to reference tables")));
283 }
284
285 /*
286 * We do not allow to create foreign constraints if shard replication factor is
287 * greater than 1. Because in our current design, multiple replicas may cause
288 * locking problems and inconsistent shard contents.
289 *
290 * Note that we allow referenced table to be a reference table (e.g., not a
291 * single replicated table). This is allowed since (a) we are sure that
292 * placements always be in the same state (b) executors are aware of reference
293 * tables and handle concurrency related issues accordingly.
294 */
295 EnsureReferencingTableNotReplicated(referencingTableId);
296
297 ReleaseSysCache(heapTuple);
298 }
299 }
300
301
302 /*
303 * EnsureSupportedFKeyBetweenCitusLocalAndRefTable is a helper function that
304 * takes a foreign key constraint form for a foreign key between two citus
305 * tables that are either citus local table or reference table and errors
306 * out if it it an unsupported foreign key from a reference table to a citus
307 * local table according to given replication model parameters.
308 */
309 static void
EnsureSupportedFKeyBetweenCitusLocalAndRefTable(Form_pg_constraint fKeyConstraintForm,char referencingReplicationModel,char referencedReplicationModel,Oid referencedTableId)310 EnsureSupportedFKeyBetweenCitusLocalAndRefTable(Form_pg_constraint fKeyConstraintForm,
311 char referencingReplicationModel,
312 char referencedReplicationModel,
313 Oid referencedTableId)
314 {
315 bool referencingIsReferenceTable =
316 (referencingReplicationModel == REPLICATION_MODEL_2PC);
317 bool referencedIsCitusLocalTable =
318 (referencedReplicationModel != REPLICATION_MODEL_2PC);
319 if (referencingIsReferenceTable && referencedIsCitusLocalTable)
320 {
321 /*
322 * We only support RESTRICT and NO ACTION behaviors for the
323 * foreign keys from reference tables to citus local tables.
324 * This is because, we can't cascade dml operations from citus
325 * local tables's coordinator placement to the remote placements
326 * of the reference table.
327 * Note that for the foreign keys from citus local tables to
328 * reference tables, we support all foreign key behaviors.
329 */
330 if (!(BehaviorIsRestrictOrNoAction(fKeyConstraintForm->confdeltype) &&
331 BehaviorIsRestrictOrNoAction(fKeyConstraintForm->confupdtype)))
332 {
333 char *referencedTableName = get_rel_name(referencedTableId);
334 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
335 errmsg("cannot define foreign key constraint, "
336 "foreign keys from reference tables to "
337 "local tables can only be defined "
338 "with NO ACTION or RESTRICT behaviors"),
339 errhint(USE_CREATE_REFERENCE_TABLE_HINT,
340 referencedTableName)));
341 }
342 }
343 }
344
345
346 /*
347 * EnsureSupportedFKeyOnDistKey errors out if given foreign key constraint form
348 * implies an unsupported ON DELETE/UPDATE behavior assuming the referencing column
349 * is the distribution column of the referencing distributed table.
350 */
351 static void
EnsureSupportedFKeyOnDistKey(Form_pg_constraint fKeyConstraintForm)352 EnsureSupportedFKeyOnDistKey(Form_pg_constraint fKeyConstraintForm)
353 {
354 /*
355 * ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do
356 * not want to set partition column to NULL or default value.
357 */
358 if (fKeyConstraintForm->confdeltype == FKCONSTR_ACTION_SETNULL ||
359 fKeyConstraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT)
360 {
361 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
362 errmsg("cannot create foreign key constraint"),
363 errdetail("SET NULL or SET DEFAULT is not supported "
364 "in ON DELETE operation when distribution "
365 "key is included in the foreign key constraint")));
366 }
367
368 /*
369 * ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported.
370 * Because we do not want to set partition column to NULL or default value. Also
371 * cascading update operation would require re-partitioning. Updating partition
372 * column value is not allowed anyway even outside of foreign key concept.
373 */
374 if (fKeyConstraintForm->confupdtype == FKCONSTR_ACTION_SETNULL ||
375 fKeyConstraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT ||
376 fKeyConstraintForm->confupdtype == FKCONSTR_ACTION_CASCADE)
377 {
378 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
379 errmsg("cannot create foreign key constraint"),
380 errdetail("SET NULL, SET DEFAULT or CASCADE is not "
381 "supported in ON UPDATE operation when "
382 "distribution key included in the foreign "
383 "constraint.")));
384 }
385 }
386
387
388 /*
389 * EnsureReferencingTableNotReplicated takes referencingTableId for the
390 * referencing table of the foreign key and errors out if it's not a single
391 * replicated table.
392 */
393 static void
EnsureReferencingTableNotReplicated(Oid referencingTableId)394 EnsureReferencingTableNotReplicated(Oid referencingTableId)
395 {
396 bool referencingNotReplicated = true;
397 bool referencingIsCitus = IsCitusTable(referencingTableId);
398
399 if (referencingIsCitus)
400 {
401 /* ALTER TABLE command is applied over single replicated table */
402 referencingNotReplicated = SingleReplicatedTable(referencingTableId);
403 }
404 else
405 {
406 /* Creating single replicated table with foreign constraint */
407 referencingNotReplicated = !DistributedTableReplicationIsEnabled();
408 }
409
410 if (!referencingNotReplicated)
411 {
412 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 errmsg("cannot create foreign key constraint"),
414 errdetail("Citus Community Edition currently supports "
415 "foreign key constraints only for "
416 "\"citus.shard_replication_factor = 1\"."),
417 errhint("Please change \"citus.shard_replication_factor to "
418 "1\". To learn more about using foreign keys with "
419 "other replication factors, please contact us at "
420 "https://citusdata.com/about/contact_us.")));
421 }
422 }
423
424
425 /*
426 * ErrorOutForFKeyBetweenPostgresAndCitusLocalTable is a helper function to
427 * error out for foreign keys between postgres local tables and citus local
428 * tables.
429 */
430 void
ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId)431 ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId)
432 {
433 char *localTableName = get_rel_name(localTableId);
434 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
435 errmsg("cannot create foreign key constraint as \"%s\" is "
436 "a postgres local table", localTableName),
437 errhint("first add local table to citus metadata "
438 "by using SELECT citus_add_local_table_to_metadata('%s') "
439 "and execute the ALTER TABLE command to create the "
440 "foreign key to local table", localTableName)));
441 }
442
443
444 /*
445 * ForeignConstraintFindDistKeys finds the index of the given distribution columns
446 * in the given foreign key constraint and returns them in referencingAttrIndex
447 * and referencedAttrIndex. If one of them is not found, it returns -1 instead.
448 */
449 static void
ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,Var * referencingDistColumn,Var * referencedDistColumn,int * referencingAttrIndex,int * referencedAttrIndex)450 ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
451 Var *referencingDistColumn,
452 Var *referencedDistColumn,
453 int *referencingAttrIndex,
454 int *referencedAttrIndex)
455 {
456 Datum *referencingColumnArray = NULL;
457 int referencingColumnCount = 0;
458 Datum *referencedColumnArray = NULL;
459 int referencedColumnCount = 0;
460 bool isNull = false;
461
462 *referencedAttrIndex = -1;
463 *referencedAttrIndex = -1;
464
465 /*
466 * Column attributes are not available in Form_pg_constraint, therefore we need
467 * to find them in the system catalog. After finding them, we iterate over column
468 * attributes together because partition column must be at the same place in both
469 * referencing and referenced side of the foreign key constraint.
470 */
471 Datum referencingColumnsDatum = SysCacheGetAttr(CONSTROID, pgConstraintTuple,
472 Anum_pg_constraint_conkey, &isNull);
473 Datum referencedColumnsDatum = SysCacheGetAttr(CONSTROID, pgConstraintTuple,
474 Anum_pg_constraint_confkey, &isNull);
475
476 deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true,
477 's', &referencingColumnArray, NULL, &referencingColumnCount);
478 deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true,
479 's', &referencedColumnArray, NULL, &referencedColumnCount);
480
481 Assert(referencingColumnCount == referencedColumnCount);
482
483 for (int attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx)
484 {
485 AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]);
486 AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]);
487
488 if (referencedDistColumn != NULL &&
489 referencedDistColumn->varattno == referencedAttrNo)
490 {
491 *referencedAttrIndex = attrIdx;
492 }
493
494 if (referencingDistColumn != NULL &&
495 referencingDistColumn->varattno == referencingAttrNo)
496 {
497 *referencingAttrIndex = attrIdx;
498 }
499 }
500 }
501
502
503 /*
504 * ColumnAppearsInForeignKey returns true if there is a foreign key constraint
505 * from/to given column. False otherwise.
506 */
507 bool
ColumnAppearsInForeignKey(char * columnName,Oid relationId)508 ColumnAppearsInForeignKey(char *columnName, Oid relationId)
509 {
510 int searchForeignKeyColumnFlags = SEARCH_REFERENCING_RELATION |
511 SEARCH_REFERENCED_RELATION;
512 List *foreignKeysColumnAppeared =
513 GetForeignKeyIdsForColumn(columnName, relationId, searchForeignKeyColumnFlags);
514 return list_length(foreignKeysColumnAppeared) > 0;
515 }
516
517
518 /*
519 * ColumnAppearsInForeignKeyToReferenceTable checks if there is a foreign key
520 * constraint from/to any reference table on the given column.
521 */
522 bool
ColumnAppearsInForeignKeyToReferenceTable(char * columnName,Oid relationId)523 ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
524 {
525 int searchForeignKeyColumnFlags = SEARCH_REFERENCING_RELATION |
526 SEARCH_REFERENCED_RELATION;
527 List *foreignKeyIdsColumnAppeared =
528 GetForeignKeyIdsForColumn(columnName, relationId, searchForeignKeyColumnFlags);
529
530 Oid foreignKeyId = InvalidOid;
531 foreach_oid(foreignKeyId, foreignKeyIdsColumnAppeared)
532 {
533 Oid referencedTableId = GetReferencedTableId(foreignKeyId);
534 if (IsCitusTableType(referencedTableId, REFERENCE_TABLE))
535 {
536 return true;
537 }
538 }
539
540 return false;
541 }
542
543
544 /*
545 * GetForeignKeyIdsForColumn takes columnName and relationId for the owning
546 * relation, and returns a list of OIDs for foreign constraints that the column
547 * with columnName is involved according to "searchForeignKeyColumnFlags" argument.
548 * See SearchForeignKeyColumnFlags enum definition for usage.
549 */
550 static List *
GetForeignKeyIdsForColumn(char * columnName,Oid relationId,int searchForeignKeyColumnFlags)551 GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
552 int searchForeignKeyColumnFlags)
553 {
554 bool searchReferencing = searchForeignKeyColumnFlags & SEARCH_REFERENCING_RELATION;
555 bool searchReferenced = searchForeignKeyColumnFlags & SEARCH_REFERENCED_RELATION;
556
557 /* at least one of them should be true */
558 Assert(searchReferencing || searchReferenced);
559
560 List *foreignKeyIdsColumnAppeared = NIL;
561
562 ScanKeyData scanKey[1];
563 int scanKeyCount = 1;
564
565 Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
566
567 ScanKeyInit(&scanKey[0], Anum_pg_constraint_contype, BTEqualStrategyNumber,
568 F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
569
570 SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, false,
571 NULL, scanKeyCount, scanKey);
572
573 HeapTuple heapTuple = systable_getnext(scanDescriptor);
574 while (HeapTupleIsValid(heapTuple))
575 {
576 int pgConstraintKey = 0;
577 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
578
579 Oid referencedTableId = constraintForm->confrelid;
580 Oid referencingTableId = constraintForm->conrelid;
581
582 if (referencedTableId == relationId && searchReferenced)
583 {
584 pgConstraintKey = Anum_pg_constraint_confkey;
585 }
586 else if (referencingTableId == relationId && searchReferencing)
587 {
588 pgConstraintKey = Anum_pg_constraint_conkey;
589 }
590 else
591 {
592 /*
593 * If the constraint is not from/to the given relation, we should simply
594 * skip.
595 */
596 heapTuple = systable_getnext(scanDescriptor);
597 continue;
598 }
599
600 if (HeapTupleOfForeignConstraintIncludesColumn(heapTuple, relationId,
601 pgConstraintKey, columnName))
602 {
603 Oid foreignKeyOid = get_relation_constraint_oid_compat(heapTuple);
604 foreignKeyIdsColumnAppeared = lappend_oid(foreignKeyIdsColumnAppeared,
605 foreignKeyOid);
606 }
607
608 heapTuple = systable_getnext(scanDescriptor);
609 }
610
611 /* clean up scan and close system catalog */
612 systable_endscan(scanDescriptor);
613 table_close(pgConstraint, NoLock);
614
615 return foreignKeyIdsColumnAppeared;
616 }
617
618
619 /*
620 * GetReferencingForeignConstaintCommands takes in a relationId, and
621 * returns the list of foreign constraint commands needed to reconstruct
622 * foreign key constraints that the table is involved in as the "referencing"
623 * one.
624 */
625 List *
GetReferencingForeignConstaintCommands(Oid relationId)626 GetReferencingForeignConstaintCommands(Oid relationId)
627 {
628 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
629 return GetForeignConstraintCommandsInternal(relationId, flags);
630 }
631
632
633 /*
634 * GetForeignConstraintToReferenceTablesCommands takes in a relationId, and
635 * returns the list of foreign constraint commands needed to reconstruct
636 * foreign key constraints that the table is involved in as the "referencing"
637 * one and the "referenced" table is a reference table.
638 */
639 List *
GetForeignConstraintToReferenceTablesCommands(Oid relationId)640 GetForeignConstraintToReferenceTablesCommands(Oid relationId)
641 {
642 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
643 return GetForeignConstraintCommandsInternal(relationId, flags);
644 }
645
646
647 /*
648 * GetForeignConstraintToDistributedTablesCommands takes in a relationId, and
649 * returns the list of foreign constraint commands needed to reconstruct
650 * foreign key constraints that the table is involved in as the "referencing"
651 * one and the "referenced" table is a distributed table.
652 */
653 List *
GetForeignConstraintToDistributedTablesCommands(Oid relationId)654 GetForeignConstraintToDistributedTablesCommands(Oid relationId)
655 {
656 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_DISTRIBUTED_TABLES;
657 return GetForeignConstraintCommandsInternal(relationId, flags);
658 }
659
660
661 /*
662 * GetForeignConstraintFromDistributedTablesCommands takes in a relationId, and
663 * returns the list of foreign constraint commands needed to reconstruct
664 * foreign key constraints that the table is involved in as the "referenced"
665 * one and the "referencing" table is a distributed table.
666 */
667 List *
GetForeignConstraintFromDistributedTablesCommands(Oid relationId)668 GetForeignConstraintFromDistributedTablesCommands(Oid relationId)
669 {
670 int flags = INCLUDE_REFERENCED_CONSTRAINTS | INCLUDE_DISTRIBUTED_TABLES;
671 return GetForeignConstraintCommandsInternal(relationId, flags);
672 }
673
674
675 /*
676 * GetForeignConstraintCommandsInternal is a wrapper function to get the
677 * DDL commands to recreate the foreign key constraints returned by
678 * GetForeignKeyOids. See more details at the underlying function.
679 */
680 List *
GetForeignConstraintCommandsInternal(Oid relationId,int flags)681 GetForeignConstraintCommandsInternal(Oid relationId, int flags)
682 {
683 List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
684
685 List *foreignKeyCommands = NIL;
686
687 PushOverrideEmptySearchPath(CurrentMemoryContext);
688
689 Oid foreignKeyOid = InvalidOid;
690 foreach_oid(foreignKeyOid, foreignKeyOids)
691 {
692 char *statementDef = pg_get_constraintdef_command(foreignKeyOid);
693
694 foreignKeyCommands = lappend(foreignKeyCommands, statementDef);
695 }
696
697 /* revert back to original search_path */
698 PopOverrideSearchPath();
699
700 return foreignKeyCommands;
701 }
702
703
704 /*
705 * get_relation_constraint_oid_compat returns OID of the constraint represented
706 * by the constraintForm, which is passed as an heapTuple. OID of the contraint
707 * is already stored in the constraintForm struct if major PostgreSQL version is
708 * 12. However, in the older versions, we should utilize HeapTupleGetOid to deduce
709 * that OID with no cost.
710 */
711 static Oid
get_relation_constraint_oid_compat(HeapTuple heapTuple)712 get_relation_constraint_oid_compat(HeapTuple heapTuple)
713 {
714 Assert(heapTuple != NULL);
715
716
717 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
718 Oid constraintOid = constraintForm->oid;
719
720 return constraintOid;
721 }
722
723
724 /*
725 * HasForeignKeyToLocalTable returns true if relation has foreign key
726 * relationship with a local table.
727 */
728 bool
HasForeignKeyWithLocalTable(Oid relationId)729 HasForeignKeyWithLocalTable(Oid relationId)
730 {
731 List *foreignKeysWithLocalTables = GetForeignKeysWithLocalTables(relationId);
732 return list_length(foreignKeysWithLocalTables) > 0;
733 }
734
735
736 /*
737 * GetForeignKeysWithLocalTables returns a list of foreign keys for foreign key
738 * relationships that relation has with local tables.
739 */
740 static List *
GetForeignKeysWithLocalTables(Oid relationId)741 GetForeignKeysWithLocalTables(Oid relationId)
742 {
743 int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
744 INCLUDE_LOCAL_TABLES;
745 List *referencingFKeyList = GetForeignKeyOids(relationId, referencingFKeysFlag);
746
747 /* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
748 int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
749 EXCLUDE_SELF_REFERENCES |
750 INCLUDE_LOCAL_TABLES;
751 List *referencedFKeyList = GetForeignKeyOids(relationId, referencedFKeysFlag);
752 return list_concat(referencingFKeyList, referencedFKeyList);
753 }
754
755
756 /*
757 * GetForeignKeysFromLocalTables returns a list of foreign keys where the referencing
758 * relation is a local table.
759 */
760 List *
GetForeignKeysFromLocalTables(Oid relationId)761 GetForeignKeysFromLocalTables(Oid relationId)
762 {
763 int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
764 INCLUDE_LOCAL_TABLES;
765 List *referencingFKeyList = GetForeignKeyOids(relationId, referencedFKeysFlag);
766
767 return referencingFKeyList;
768 }
769
770
771 /*
772 * HasForeignKeyToCitusLocalTable returns true if any of the foreign key constraints
773 * on the relation with relationId references to a citus local table.
774 */
775 bool
HasForeignKeyToCitusLocalTable(Oid relationId)776 HasForeignKeyToCitusLocalTable(Oid relationId)
777 {
778 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_CITUS_LOCAL_TABLES;
779 List *foreignKeyOidList = GetForeignKeyOids(relationId, flags);
780 return list_length(foreignKeyOidList) > 0;
781 }
782
783
784 /*
785 * HasForeignKeyToReferenceTable returns true if any of the foreign key
786 * constraints on the relation with relationId references to a reference
787 * table.
788 */
789 bool
HasForeignKeyToReferenceTable(Oid relationId)790 HasForeignKeyToReferenceTable(Oid relationId)
791 {
792 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
793 List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
794
795 return list_length(foreignKeyOids) > 0;
796 }
797
798
799 /*
800 * TableReferenced function checks whether given table is referenced by another table
801 * via foreign constraints. If it is referenced, this function returns true.
802 */
803 bool
TableReferenced(Oid relationId)804 TableReferenced(Oid relationId)
805 {
806 int flags = INCLUDE_REFERENCED_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
807 List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
808
809 return list_length(foreignKeyOids) > 0;
810 }
811
812
813 /*
814 * HeapTupleOfForeignConstraintIncludesColumn fetches the columns from the foreign
815 * constraint and checks if the given column name matches one of them.
816 */
817 static bool
HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple,Oid relationId,int pgConstraintKey,char * columnName)818 HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid relationId,
819 int pgConstraintKey, char *columnName)
820 {
821 Datum *columnArray = NULL;
822 int columnCount = 0;
823 bool isNull = false;
824
825 Datum columnsDatum = SysCacheGetAttr(CONSTROID, heapTuple, pgConstraintKey, &isNull);
826 deconstruct_array(DatumGetArrayTypeP(columnsDatum), INT2OID, 2, true,
827 's', &columnArray, NULL, &columnCount);
828
829 for (int attrIdx = 0; attrIdx < columnCount; ++attrIdx)
830 {
831 AttrNumber attrNo = DatumGetInt16(columnArray[attrIdx]);
832
833 char *colName = get_attname(relationId, attrNo, false);
834 if (strncmp(colName, columnName, NAMEDATALEN) == 0)
835 {
836 return true;
837 }
838 }
839
840 return false;
841 }
842
843
844 /*
845 * TableReferencing function checks whether given table is referencing to another
846 * table via foreign key constraints. If it is referencing, this function returns
847 * true.
848 */
849 bool
TableReferencing(Oid relationId)850 TableReferencing(Oid relationId)
851 {
852 int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
853 List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
854
855 return list_length(foreignKeyOids) > 0;
856 }
857
858
859 /*
860 * ConstraintWithNameIsOfType is a wrapper around ConstraintWithNameIsOfType that returns true
861 * if given constraint name identifies a uniqueness constraint, i.e:
862 * - primary key constraint, or
863 * - unique constraint
864 */
865 bool
ConstraintIsAUniquenessConstraint(char * inputConstaintName,Oid relationId)866 ConstraintIsAUniquenessConstraint(char *inputConstaintName, Oid relationId)
867 {
868 bool isUniqueConstraint = ConstraintWithNameIsOfType(inputConstaintName, relationId,
869 CONSTRAINT_UNIQUE);
870 bool isPrimaryConstraint = ConstraintWithNameIsOfType(inputConstaintName, relationId,
871 CONSTRAINT_PRIMARY);
872 return isUniqueConstraint || isPrimaryConstraint;
873 }
874
875
876 /*
877 * ConstraintIsAForeignKey is a wrapper around ConstraintWithNameIsOfType that returns true
878 * if given constraint name identifies a foreign key constraint.
879 */
880 bool
ConstraintIsAForeignKey(char * inputConstaintName,Oid relationId)881 ConstraintIsAForeignKey(char *inputConstaintName, Oid relationId)
882 {
883 return ConstraintWithNameIsOfType(inputConstaintName, relationId, CONSTRAINT_FOREIGN);
884 }
885
886
887 /*
888 * ConstraintWithNameIsOfType is a wrapper around get_relation_constraint_oid that
889 * returns true if given constraint name identifies a valid constraint defined
890 * on relation with relationId and it's type matches the input constraint type.
891 */
892 bool
ConstraintWithNameIsOfType(char * inputConstaintName,Oid relationId,char targetConstraintType)893 ConstraintWithNameIsOfType(char *inputConstaintName, Oid relationId,
894 char targetConstraintType)
895 {
896 bool missingOk = true;
897 Oid constraintId =
898 get_relation_constraint_oid(relationId, inputConstaintName, missingOk);
899 return ConstraintWithIdIsOfType(constraintId, targetConstraintType);
900 }
901
902
903 /*
904 * ConstraintWithIdIsOfType returns true if constraint with constraintId exists
905 * and is of type targetConstraintType.
906 */
907 bool
ConstraintWithIdIsOfType(Oid constraintId,char targetConstraintType)908 ConstraintWithIdIsOfType(Oid constraintId, char targetConstraintType)
909 {
910 HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintId));
911 if (!HeapTupleIsValid(heapTuple))
912 {
913 /* no such constraint */
914 return false;
915 }
916
917 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
918 char constraintType = constraintForm->contype;
919 bool constraintTypeMatches = (constraintType == targetConstraintType);
920
921 ReleaseSysCache(heapTuple);
922
923 return constraintTypeMatches;
924 }
925
926
927 /*
928 * FindForeignKeyOidWithName searches the foreign key constraint with
929 * inputConstraintName in the given list of foreign key constraint OIDs.
930 * Returns the OID of the matching constraint. If there no matching constraint
931 * in the given list, then returns InvalidOid.
932 */
933 static Oid
FindForeignKeyOidWithName(List * foreignKeyOids,const char * inputConstraintName)934 FindForeignKeyOidWithName(List *foreignKeyOids, const char *inputConstraintName)
935 {
936 Oid foreignKeyOid = InvalidOid;
937 foreach_oid(foreignKeyOid, foreignKeyOids)
938 {
939 char *constraintName = get_constraint_name(foreignKeyOid);
940
941 Assert(constraintName != NULL);
942
943 if (strncmp(constraintName, inputConstraintName, NAMEDATALEN) == 0)
944 {
945 return foreignKeyOid;
946 }
947 }
948
949 return InvalidOid;
950 }
951
952
953 /*
954 * TableHasExternalForeignKeys returns true if the relation with relationId is
955 * involved in a foreign key relationship other than the self-referencing ones.
956 */
957 bool
TableHasExternalForeignKeys(Oid relationId)958 TableHasExternalForeignKeys(Oid relationId)
959 {
960 int flags = (INCLUDE_REFERENCING_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
961 INCLUDE_ALL_TABLE_TYPES);
962 List *foreignKeyIdsTableReferencing = GetForeignKeyOids(relationId, flags);
963
964 flags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
965 INCLUDE_ALL_TABLE_TYPES);
966 List *foreignKeyIdsTableReferenced = GetForeignKeyOids(relationId, flags);
967
968 List *foreignKeysWithOtherTables = list_concat(foreignKeyIdsTableReferencing,
969 foreignKeyIdsTableReferenced);
970
971 if (list_length(foreignKeysWithOtherTables) == 0)
972 {
973 return false;
974 }
975
976 return true;
977 }
978
979
980 /*
981 * GetForeignKeyOids takes in a relationId, and returns a list of OIDs for
982 * foreign constraints that the relation with relationId is involved according
983 * to "flags" argument. See ExtractForeignKeyConstraintsMode enum definition
984 * for usage of the flags.
985 */
986 List *
GetForeignKeyOids(Oid relationId,int flags)987 GetForeignKeyOids(Oid relationId, int flags)
988 {
989 AttrNumber pgConstraintTargetAttrNumber = InvalidAttrNumber;
990
991 bool extractReferencing = (flags & INCLUDE_REFERENCING_CONSTRAINTS);
992 bool extractReferenced = (flags & INCLUDE_REFERENCED_CONSTRAINTS);
993
994 /*
995 * Only one of them should be passed at a time since the way we scan
996 * pg_constraint differs for those columns. Anum_pg_constraint_conrelid
997 * supports index scan while Anum_pg_constraint_confrelid does not.
998 */
999 Assert(!(extractReferencing && extractReferenced));
1000 Assert(extractReferencing || extractReferenced);
1001
1002 bool useIndex = false;
1003 Oid indexOid = InvalidOid;
1004
1005 if (extractReferencing)
1006 {
1007 pgConstraintTargetAttrNumber = Anum_pg_constraint_conrelid;
1008
1009 useIndex = true;
1010 indexOid = ConstraintRelidTypidNameIndexId;
1011 }
1012 else if (extractReferenced)
1013 {
1014 pgConstraintTargetAttrNumber = Anum_pg_constraint_confrelid;
1015 }
1016
1017 bool excludeSelfReference = (flags & EXCLUDE_SELF_REFERENCES);
1018
1019 List *foreignKeyOids = NIL;
1020
1021 ScanKeyData scanKey[1];
1022 int scanKeyCount = 1;
1023
1024 Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
1025 ScanKeyInit(&scanKey[0], pgConstraintTargetAttrNumber,
1026 BTEqualStrategyNumber, F_OIDEQ, relationId);
1027 SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, indexOid, useIndex,
1028 NULL, scanKeyCount, scanKey);
1029
1030 HeapTuple heapTuple = systable_getnext(scanDescriptor);
1031 while (HeapTupleIsValid(heapTuple))
1032 {
1033 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1034
1035 if (constraintForm->contype != CONSTRAINT_FOREIGN)
1036 {
1037 heapTuple = systable_getnext(scanDescriptor);
1038 continue;
1039 }
1040
1041 bool inheritedConstraint = OidIsValid(constraintForm->conparentid);
1042 if (inheritedConstraint)
1043 {
1044 /*
1045 * We only consider the constraints that are explicitly created on
1046 * the table as we already process the constraints from parent tables
1047 * implicitly when a command is issued
1048 */
1049 heapTuple = systable_getnext(scanDescriptor);
1050 continue;
1051 }
1052
1053 Oid constraintId = get_relation_constraint_oid_compat(heapTuple);
1054
1055 bool isSelfReference = (constraintForm->conrelid == constraintForm->confrelid);
1056 if (excludeSelfReference && isSelfReference)
1057 {
1058 heapTuple = systable_getnext(scanDescriptor);
1059 continue;
1060 }
1061
1062 Oid otherTableId = InvalidOid;
1063 if (extractReferencing)
1064 {
1065 otherTableId = constraintForm->confrelid;
1066 }
1067 else if (extractReferenced)
1068 {
1069 otherTableId = constraintForm->conrelid;
1070 }
1071
1072 if (!IsTableTypeIncluded(otherTableId, flags))
1073 {
1074 heapTuple = systable_getnext(scanDescriptor);
1075 continue;
1076 }
1077
1078 foreignKeyOids = lappend_oid(foreignKeyOids, constraintId);
1079
1080 heapTuple = systable_getnext(scanDescriptor);
1081 }
1082
1083 systable_endscan(scanDescriptor);
1084
1085 /*
1086 * Do not release AccessShareLock yet to prevent modifications to be done
1087 * on pg_constraint to make sure that caller will process valid foreign key
1088 * constraints through the transaction.
1089 */
1090 table_close(pgConstraint, NoLock);
1091
1092 return foreignKeyOids;
1093 }
1094
1095
1096 /*
1097 * GetReferencedTableId returns OID of the referenced relation for the foreign
1098 * key with foreignKeyId. If there is no such foreign key, then this function
1099 * returns InvalidOid.
1100 */
1101 Oid
GetReferencedTableId(Oid foreignKeyId)1102 GetReferencedTableId(Oid foreignKeyId)
1103 {
1104 HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyId));
1105 if (!HeapTupleIsValid(heapTuple))
1106 {
1107 /* no such foreign key */
1108 return InvalidOid;
1109 }
1110
1111 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1112 Oid referencedTableId = constraintForm->confrelid;
1113
1114 ReleaseSysCache(heapTuple);
1115
1116 return referencedTableId;
1117 }
1118
1119
1120 /*
1121 * GetReferencingTableId returns OID of the referencing relation for the foreign
1122 * key with foreignKeyId. If there is no such foreign key, then this function
1123 * returns InvalidOid.
1124 */
1125 Oid
GetReferencingTableId(Oid foreignKeyId)1126 GetReferencingTableId(Oid foreignKeyId)
1127 {
1128 HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyId));
1129 if (!HeapTupleIsValid(heapTuple))
1130 {
1131 /* no such foreign key */
1132 return InvalidOid;
1133 }
1134
1135 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1136 Oid referencingTableId = constraintForm->conrelid;
1137
1138 ReleaseSysCache(heapTuple);
1139
1140 return referencingTableId;
1141 }
1142
1143
1144 /*
1145 * IsTableTypeIncluded returns true if type of the table with relationId (distributed,
1146 * reference, Citus local or Postgres local) is included in the flags, false if not
1147 */
1148 static bool
IsTableTypeIncluded(Oid relationId,int flags)1149 IsTableTypeIncluded(Oid relationId, int flags)
1150 {
1151 if (!IsCitusTable(relationId))
1152 {
1153 return (flags & INCLUDE_LOCAL_TABLES) != 0;
1154 }
1155 else if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
1156 {
1157 return (flags & INCLUDE_DISTRIBUTED_TABLES) != 0;
1158 }
1159 else if (IsCitusTableType(relationId, REFERENCE_TABLE))
1160 {
1161 return (flags & INCLUDE_REFERENCE_TABLES) != 0;
1162 }
1163 else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
1164 {
1165 return (flags & INCLUDE_CITUS_LOCAL_TABLES) != 0;
1166 }
1167 return false;
1168 }
1169
1170
1171 /*
1172 * GetForeignConstraintCommandsToReferenceTable takes in a shardInterval, and
1173 * returns the list of commands that are required to create the foreign
1174 * constraints for that shardInterval.
1175 *
1176 * The function does the following hack:
1177 * - Create the foreign constraints as INVALID on the shards
1178 * - Manually update pg_constraint to mark the same foreign
1179 * constraints as VALID
1180 *
1181 * We implement the above hack because we aim to skip the validation phase
1182 * of foreign keys to reference tables. The validation is pretty costly and
1183 * given that the source placements already valid, the validation in the
1184 * target nodes is useless.
1185 *
1186 * The function does not apply the same logic for the already invalid foreign
1187 * constraints.
1188 */
1189 List *
GetForeignConstraintCommandsToReferenceTable(ShardInterval * shardInterval)1190 GetForeignConstraintCommandsToReferenceTable(ShardInterval *shardInterval)
1191 {
1192 ScanKeyData scanKey[1];
1193 int scanKeyCount = 1;
1194 uint64 shardId = shardInterval->shardId;
1195 Oid relationId = shardInterval->relationId;
1196
1197 List *commandList = NIL;
1198
1199 /*
1200 * Set search_path to NIL so that all objects outside of pg_catalog will be
1201 * schema-prefixed. pg_catalog will be added automatically when we call
1202 * PushOverrideSearchPath(), since we set addCatalog to true;
1203 */
1204 OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
1205 overridePath->schemas = NIL;
1206 overridePath->addCatalog = true;
1207 PushOverrideSearchPath(overridePath);
1208
1209 /* open system catalog and scan all constraints that belong to this table */
1210 Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
1211 ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
1212 relationId);
1213
1214 SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
1215 ConstraintRelidTypidNameIndexId,
1216 true, NULL, scanKeyCount, scanKey);
1217
1218 HeapTuple heapTuple = systable_getnext(scanDescriptor);
1219 while (HeapTupleIsValid(heapTuple))
1220 {
1221 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1222 char *constraintDefinition = NULL;
1223
1224
1225 if (constraintForm->contype != CONSTRAINT_FOREIGN)
1226 {
1227 heapTuple = systable_getnext(scanDescriptor);
1228 continue;
1229 }
1230
1231 Oid referencedRelationId = constraintForm->confrelid;
1232 if (PartitionMethod(referencedRelationId) != DISTRIBUTE_BY_NONE)
1233 {
1234 heapTuple = systable_getnext(scanDescriptor);
1235 continue;
1236 }
1237
1238 Oid constraintId = get_relation_constraint_oid(relationId,
1239 constraintForm->conname.data,
1240 true);
1241
1242 int64 referencedShardId = GetFirstShardId(referencedRelationId);
1243 Oid referencedSchemaId = get_rel_namespace(referencedRelationId);
1244 char *referencedSchemaName = get_namespace_name(referencedSchemaId);
1245 char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
1246
1247 Oid schemaId = get_rel_namespace(relationId);
1248 char *schemaName = get_namespace_name(schemaId);
1249 char *escapedSchemaName = quote_literal_cstr(schemaName);
1250
1251 /*
1252 * We're first marking the constraint's valid field as invalid
1253 * and get the constraint definition. Later, we mark the constraint
1254 * as valid back with directly updating to pg_constraint.
1255 */
1256 if (constraintForm->convalidated == true)
1257 {
1258 UpdateConstraintIsValid(constraintId, false);
1259 constraintDefinition = pg_get_constraintdef_command(constraintId);
1260 UpdateConstraintIsValid(constraintId, true);
1261 }
1262 else
1263 {
1264 /* if the constraint is not valid, simply do nothing special */
1265 constraintDefinition = pg_get_constraintdef_command(constraintId);
1266 }
1267
1268 StringInfo applyForeignConstraintCommand = makeStringInfo();
1269 appendStringInfo(applyForeignConstraintCommand,
1270 WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId,
1271 escapedSchemaName, referencedShardId,
1272 escapedReferencedSchemaName,
1273 quote_literal_cstr(constraintDefinition));
1274 commandList = lappend(commandList, applyForeignConstraintCommand->data);
1275
1276 /* mark the constraint as valid again on the shard */
1277 if (constraintForm->convalidated == true)
1278 {
1279 StringInfo markConstraintValid = makeStringInfo();
1280 char *qualifiedReferencingShardName =
1281 ConstructQualifiedShardName(shardInterval);
1282
1283 char *shardConstraintName = pstrdup(constraintForm->conname.data);
1284 AppendShardIdToName(&shardConstraintName, shardId);
1285
1286 appendStringInfo(markConstraintValid,
1287 "UPDATE pg_constraint SET convalidated = true WHERE "
1288 "conrelid = %s::regclass AND conname = '%s'",
1289 quote_literal_cstr(qualifiedReferencingShardName),
1290 shardConstraintName);
1291 commandList = lappend(commandList, markConstraintValid->data);
1292 }
1293
1294 heapTuple = systable_getnext(scanDescriptor);
1295 }
1296
1297 /* clean up scan and close system catalog */
1298 systable_endscan(scanDescriptor);
1299 table_close(pgConstraint, AccessShareLock);
1300
1301 /* revert back to original search_path */
1302 PopOverrideSearchPath();
1303
1304 return commandList;
1305 }
1306
1307
1308 /*
1309 * UpdateConstraintIsValid is a utility function with sets the
1310 * pg_constraint.convalidated to the given isValid for the given
1311 * constraintId.
1312 *
1313 * This function should be called with caution because if used wrong
1314 * could lead to data inconsistencies.
1315 */
1316 static void
UpdateConstraintIsValid(Oid constraintId,bool isValid)1317 UpdateConstraintIsValid(Oid constraintId, bool isValid)
1318 {
1319 ScanKeyData scankey[1];
1320 Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
1321 TupleDesc tupleDescriptor = RelationGetDescr(pgConstraint);
1322 Datum values[Natts_pg_constraint];
1323 bool isnull[Natts_pg_constraint];
1324 bool replace[Natts_pg_constraint];
1325
1326 ScanKeyInit(&scankey[0],
1327 Anum_pg_constraint_oid,
1328 BTEqualStrategyNumber, F_OIDEQ,
1329 ObjectIdGetDatum(constraintId));
1330
1331 SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
1332 ConstraintOidIndexId,
1333 true,
1334 NULL,
1335 1,
1336 scankey);
1337 HeapTuple heapTuple = systable_getnext(scanDescriptor);
1338 if (!HeapTupleIsValid(heapTuple))
1339 {
1340 elog(ERROR, "could not find tuple for constraint %u", constraintId);
1341 }
1342
1343 memset(replace, 0, sizeof(replace));
1344
1345 values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValid);
1346 isnull[Anum_pg_constraint_convalidated - 1] = false;
1347 replace[Anum_pg_constraint_convalidated - 1] = true;
1348
1349 heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
1350
1351 CatalogTupleUpdate(pgConstraint, &heapTuple->t_self, heapTuple);
1352
1353 CacheInvalidateHeapTuple(pgConstraint, heapTuple, NULL);
1354 CommandCounterIncrement();
1355
1356 systable_endscan(scanDescriptor);
1357 table_close(pgConstraint, NoLock);
1358 }
1359
1360
1361 /*
1362 * RelationInvolvedInAnyNonInheritedForeignKeys returns true if relation involved
1363 * in a foreign key that is not inherited from its parent relation.
1364 */
1365 bool
RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId)1366 RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId)
1367 {
1368 List *referencingForeignKeys = GetForeignKeyOids(relationId,
1369 INCLUDE_REFERENCING_CONSTRAINTS |
1370 INCLUDE_ALL_TABLE_TYPES);
1371
1372 /*
1373 * We already capture self-referencing foreign keys above, so use
1374 * EXCLUDE_SELF_REFERENCES here
1375 */
1376 List *referencedForeignKeys = GetForeignKeyOids(relationId,
1377 INCLUDE_REFERENCED_CONSTRAINTS |
1378 EXCLUDE_SELF_REFERENCES |
1379 INCLUDE_ALL_TABLE_TYPES);
1380 List *foreignKeysRelationInvolved = list_concat(referencingForeignKeys,
1381 referencedForeignKeys);
1382 Oid foreignKeyId = InvalidOid;
1383 foreach_oid(foreignKeyId, foreignKeysRelationInvolved)
1384 {
1385 HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyId));
1386 if (!HeapTupleIsValid(heapTuple))
1387 {
1388 /* not possible but be on the safe side */
1389 continue;
1390 }
1391
1392 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1393 Oid parentConstraintId = constraintForm->conparentid;
1394 if (!OidIsValid(parentConstraintId))
1395 {
1396 return true;
1397 }
1398 }
1399
1400 return false;
1401 }
1402