1 /*-------------------------------------------------------------------------
2  *
3  * foreign_constraint.c
4  *
5  * This file contains functions to create, alter and drop foreign
6  * constraints on distributed tables.
7  *
8  * Copyright (c) Citus Data, Inc.
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 #include "postgres.h"
14 
15 #include "distributed/pg_version_constants.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/xact.h"
20 #include "catalog/namespace.h"
21 #include "catalog/pg_constraint.h"
22 #include "access/genam.h"
23 #include "catalog/pg_type.h"
24 #include "distributed/colocation_utils.h"
25 #include "distributed/commands.h"
26 #include "distributed/coordinator_protocol.h"
27 #include "distributed/listutils.h"
28 #include "distributed/coordinator_protocol.h"
29 #include "distributed/multi_join_order.h"
30 #include "distributed/namespace_utils.h"
31 #include "distributed/reference_table_utils.h"
32 #include "distributed/version_compat.h"
33 #include "utils/builtins.h"
34 #include "utils/fmgroids.h"
35 #include "utils/inval.h"
36 #include "utils/lsyscache.h"
37 #include "utils/rel.h"
38 #include "utils/relcache.h"
39 #include "utils/ruleutils.h"
40 #include "utils/syscache.h"
41 
42 
43 #define BehaviorIsRestrictOrNoAction(x) \
44 	((x) == FKCONSTR_ACTION_NOACTION || (x) == FKCONSTR_ACTION_RESTRICT)
45 
46 
47 #define USE_CREATE_REFERENCE_TABLE_HINT \
48 	"You could use SELECT create_reference_table('%s') " \
49 	"to replicate the referenced table to all nodes or " \
50 	"consider dropping the foreign key"
51 
52 
53 typedef bool (*CheckRelationFunc)(Oid);
54 
55 
56 /* Local functions forward declarations */
57 static void EnsureReferencingTableNotReplicated(Oid referencingTableId);
58 static void EnsureSupportedFKeyOnDistKey(Form_pg_constraint constraintForm);
59 static void EnsureSupportedFKeyBetweenCitusLocalAndRefTable(Form_pg_constraint
60 															constraintForm,
61 															char
62 															referencingReplicationModel,
63 															char
64 															referencedReplicationModel,
65 															Oid referencedTableId);
66 static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple,
67 													   Oid relationId,
68 													   int pgConstraintKey,
69 													   char *columnName);
70 static Oid FindForeignKeyOidWithName(List *foreignKeyOids, const
71 									 char *inputConstraintName);
72 static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
73 										  Var *referencingDistColumn,
74 										  Var *referencedDistColumn,
75 										  int *referencingAttrIndex,
76 										  int *referencedAttrIndex);
77 static List * GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
78 										int searchForeignKeyColumnFlags);
79 static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple);
80 static List * GetForeignKeysWithLocalTables(Oid relationId);
81 static bool IsTableTypeIncluded(Oid relationId, int flags);
82 static void UpdateConstraintIsValid(Oid constraintId, bool isValid);
83 
84 
85 /*
86  * ConstraintIsAForeignKeyToReferenceTable checks if the given constraint is a
87  * foreign key constraint from the given relation to any reference table.
88  */
89 bool
ConstraintIsAForeignKeyToReferenceTable(char * inputConstaintName,Oid relationId)90 ConstraintIsAForeignKeyToReferenceTable(char *inputConstaintName, Oid relationId)
91 {
92 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
93 	List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
94 
95 	Oid foreignKeyOid = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);
96 
97 	return OidIsValid(foreignKeyOid);
98 }
99 
100 
101 /*
102  * ErrorIfUnsupportedForeignConstraintExists runs checks related to foreign
103  * constraints and errors out if it is not possible to create one of the
104  * foreign constraint in distributed environment.
105  *
106  * To support foreign constraints, we require that;
107  * - If referencing and referenced tables are hash-distributed
108  *		- Referencing and referenced tables are co-located.
109  *      - Foreign constraint is defined over distribution column.
110  *		- ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
111  *        ON UPDATE CASCADE options
112  *        are not used.
113  *      - Replication factors of referencing and referenced table are 1.
114  * - If referenced table is a reference table
115  *      - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
116  *        ON UPDATE CASCADE options are not used on the distribution key
117  *        of the referencing column.
118  * - If referencing table is a reference table, error out if the referenced
119  *   table is a distributed table.
120  * - If referencing table is a reference table and referenced table is a
121  *   citus local table:
122  *      - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
123  *        ON CASCADE options are not used.
124  * - If referencing or referenced table is distributed table, then the
125  *   other table is not a citus local table.
126  */
127 void
ErrorIfUnsupportedForeignConstraintExists(Relation relation,char referencingDistMethod,char referencingReplicationModel,Var * referencingDistKey,uint32 referencingColocationId)128 ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDistMethod,
129 										  char referencingReplicationModel,
130 										  Var *referencingDistKey,
131 										  uint32 referencingColocationId)
132 {
133 	Oid referencingTableId = relation->rd_id;
134 
135 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
136 	List *foreignKeyOids = GetForeignKeyOids(referencingTableId, flags);
137 
138 	Oid foreignKeyOid = InvalidOid;
139 	foreach_oid(foreignKeyOid, foreignKeyOids)
140 	{
141 		HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyOid));
142 
143 		Assert(HeapTupleIsValid(heapTuple));
144 
145 		Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
146 
147 		int referencingAttrIndex = -1;
148 
149 		Var *referencedDistKey = NULL;
150 		int referencedAttrIndex = -1;
151 		uint32 referencedColocationId = INVALID_COLOCATION_ID;
152 
153 		Oid referencedTableId = constraintForm->confrelid;
154 		bool referencedIsCitus = IsCitusTable(referencedTableId);
155 
156 		bool selfReferencingTable = (referencingTableId == referencedTableId);
157 
158 		if (!referencedIsCitus && !selfReferencingTable)
159 		{
160 			if (IsCitusLocalTableByDistParams(referencingDistMethod,
161 											  referencingReplicationModel))
162 			{
163 				ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(referencedTableId);
164 			}
165 
166 			char *referencedTableName = get_rel_name(referencedTableId);
167 
168 			ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
169 							errmsg("referenced table \"%s\" must be a distributed table"
170 								   " or a reference table",
171 								   referencedTableName),
172 							errdetail("To enforce foreign keys, the referencing and "
173 									  "referenced rows need to be stored on the same "
174 									  "node."),
175 							errhint(USE_CREATE_REFERENCE_TABLE_HINT,
176 									referencedTableName)));
177 		}
178 
179 		/* set referenced table related variables here if table is referencing itself */
180 		char referencedDistMethod = 0;
181 		char referencedReplicationModel = REPLICATION_MODEL_INVALID;
182 		if (!selfReferencingTable)
183 		{
184 			referencedDistMethod = PartitionMethod(referencedTableId);
185 			referencedDistKey = IsCitusTableType(referencedTableId,
186 												 CITUS_TABLE_WITH_NO_DIST_KEY) ?
187 								NULL :
188 								DistPartitionKey(referencedTableId);
189 			referencedColocationId = TableColocationId(referencedTableId);
190 			referencedReplicationModel = TableReplicationModel(referencedTableId);
191 		}
192 		else
193 		{
194 			referencedDistMethod = referencingDistMethod;
195 			referencedDistKey = referencingDistKey;
196 			referencedColocationId = referencingColocationId;
197 			referencedReplicationModel = referencingReplicationModel;
198 		}
199 
200 		bool referencingIsCitusLocalOrRefTable =
201 			(referencingDistMethod == DISTRIBUTE_BY_NONE);
202 		bool referencedIsCitusLocalOrRefTable =
203 			(referencedDistMethod == DISTRIBUTE_BY_NONE);
204 		if (referencingIsCitusLocalOrRefTable && referencedIsCitusLocalOrRefTable)
205 		{
206 			EnsureSupportedFKeyBetweenCitusLocalAndRefTable(constraintForm,
207 															referencingReplicationModel,
208 															referencedReplicationModel,
209 															referencedTableId);
210 
211 			ReleaseSysCache(heapTuple);
212 			continue;
213 		}
214 
215 		/*
216 		 * Foreign keys from citus local tables or reference tables to distributed
217 		 * tables are not supported.
218 		 */
219 		if (referencingIsCitusLocalOrRefTable && !referencedIsCitusLocalOrRefTable)
220 		{
221 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
222 							errmsg("cannot create foreign key constraint "
223 								   "since foreign keys from reference tables "
224 								   "and local tables to distributed tables "
225 								   "are not supported"),
226 							errdetail("Reference tables and local tables "
227 									  "can only have foreign keys to reference "
228 									  "tables and local tables")));
229 		}
230 
231 		/*
232 		 * To enforce foreign constraints, tables must be co-located unless a
233 		 * reference table is referenced.
234 		 */
235 		bool referencedIsReferenceTable =
236 			(referencedReplicationModel == REPLICATION_MODEL_2PC);
237 		if (!referencedIsReferenceTable && (
238 				referencingColocationId == INVALID_COLOCATION_ID ||
239 				referencingColocationId != referencedColocationId))
240 		{
241 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
242 							errmsg("cannot create foreign key constraint since "
243 								   "relations are not colocated or not referencing "
244 								   "a reference table"),
245 							errdetail(
246 								"A distributed table can only have foreign keys "
247 								"if it is referencing another colocated hash "
248 								"distributed table or a reference table")));
249 		}
250 
251 		ForeignConstraintFindDistKeys(heapTuple,
252 									  referencingDistKey,
253 									  referencedDistKey,
254 									  &referencingAttrIndex,
255 									  &referencedAttrIndex);
256 		bool referencingColumnsIncludeDistKey = (referencingAttrIndex != -1);
257 		bool foreignConstraintOnDistKey =
258 			(referencingColumnsIncludeDistKey && referencingAttrIndex ==
259 			 referencedAttrIndex);
260 
261 		/*
262 		 * If columns in the foreign key includes the distribution key from the
263 		 * referencing side, we do not allow update/delete operations through
264 		 * foreign key constraints (e.g. ... ON UPDATE SET NULL)
265 		 */
266 		if (referencingColumnsIncludeDistKey)
267 		{
268 			EnsureSupportedFKeyOnDistKey(constraintForm);
269 		}
270 
271 		/*
272 		 * if tables are hash-distributed and colocated, we need to make sure that
273 		 * the distribution key is included in foreign constraint.
274 		 */
275 		if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
276 		{
277 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
278 							errmsg("cannot create foreign key constraint"),
279 							errdetail("Foreign keys are supported in two cases, "
280 									  "either in between two colocated tables including "
281 									  "partition column in the same ordinal in the both "
282 									  "tables or from distributed to reference tables")));
283 		}
284 
285 		/*
286 		 * We do not allow to create foreign constraints if shard replication factor is
287 		 * greater than 1. Because in our current design, multiple replicas may cause
288 		 * locking problems and inconsistent shard contents.
289 		 *
290 		 * Note that we allow referenced table to be a reference table (e.g., not a
291 		 * single replicated table). This is allowed since (a) we are sure that
292 		 * placements always be in the same state (b) executors are aware of reference
293 		 * tables and handle concurrency related issues accordingly.
294 		 */
295 		EnsureReferencingTableNotReplicated(referencingTableId);
296 
297 		ReleaseSysCache(heapTuple);
298 	}
299 }
300 
301 
302 /*
303  * EnsureSupportedFKeyBetweenCitusLocalAndRefTable is a helper function that
304  * takes a foreign key constraint form for a foreign key between two citus
305  * tables that are either citus local table or reference table and errors
306  * out if it it an unsupported foreign key from a reference table to a citus
307  * local table according to given replication model parameters.
308  */
309 static void
EnsureSupportedFKeyBetweenCitusLocalAndRefTable(Form_pg_constraint fKeyConstraintForm,char referencingReplicationModel,char referencedReplicationModel,Oid referencedTableId)310 EnsureSupportedFKeyBetweenCitusLocalAndRefTable(Form_pg_constraint fKeyConstraintForm,
311 												char referencingReplicationModel,
312 												char referencedReplicationModel,
313 												Oid referencedTableId)
314 {
315 	bool referencingIsReferenceTable =
316 		(referencingReplicationModel == REPLICATION_MODEL_2PC);
317 	bool referencedIsCitusLocalTable =
318 		(referencedReplicationModel != REPLICATION_MODEL_2PC);
319 	if (referencingIsReferenceTable && referencedIsCitusLocalTable)
320 	{
321 		/*
322 		 * We only support RESTRICT and NO ACTION behaviors for the
323 		 * foreign keys from reference tables to citus local tables.
324 		 * This is because, we can't cascade dml operations from citus
325 		 * local tables's coordinator placement to the remote placements
326 		 * of the reference table.
327 		 * Note that for the foreign keys from citus local tables to
328 		 * reference tables, we support all foreign key behaviors.
329 		 */
330 		if (!(BehaviorIsRestrictOrNoAction(fKeyConstraintForm->confdeltype) &&
331 			  BehaviorIsRestrictOrNoAction(fKeyConstraintForm->confupdtype)))
332 		{
333 			char *referencedTableName = get_rel_name(referencedTableId);
334 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
335 							errmsg("cannot define foreign key constraint, "
336 								   "foreign keys from reference tables to "
337 								   "local tables can only be defined "
338 								   "with NO ACTION or RESTRICT behaviors"),
339 							errhint(USE_CREATE_REFERENCE_TABLE_HINT,
340 									referencedTableName)));
341 		}
342 	}
343 }
344 
345 
346 /*
347  * EnsureSupportedFKeyOnDistKey errors out if given foreign key constraint form
348  * implies an unsupported ON DELETE/UPDATE behavior assuming the referencing column
349  * is the distribution column of the referencing distributed table.
350  */
351 static void
EnsureSupportedFKeyOnDistKey(Form_pg_constraint fKeyConstraintForm)352 EnsureSupportedFKeyOnDistKey(Form_pg_constraint fKeyConstraintForm)
353 {
354 	/*
355 	 * ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do
356 	 * not want to set partition column to NULL or default value.
357 	 */
358 	if (fKeyConstraintForm->confdeltype == FKCONSTR_ACTION_SETNULL ||
359 		fKeyConstraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT)
360 	{
361 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
362 						errmsg("cannot create foreign key constraint"),
363 						errdetail("SET NULL or SET DEFAULT is not supported "
364 								  "in ON DELETE operation when distribution "
365 								  "key is included in the foreign key constraint")));
366 	}
367 
368 	/*
369 	 * ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported.
370 	 * Because we do not want to set partition column to NULL or default value. Also
371 	 * cascading update operation would require re-partitioning. Updating partition
372 	 * column value is not allowed anyway even outside of foreign key concept.
373 	 */
374 	if (fKeyConstraintForm->confupdtype == FKCONSTR_ACTION_SETNULL ||
375 		fKeyConstraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT ||
376 		fKeyConstraintForm->confupdtype == FKCONSTR_ACTION_CASCADE)
377 	{
378 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
379 						errmsg("cannot create foreign key constraint"),
380 						errdetail("SET NULL, SET DEFAULT or CASCADE is not "
381 								  "supported in ON UPDATE operation when "
382 								  "distribution key included in the foreign "
383 								  "constraint.")));
384 	}
385 }
386 
387 
388 /*
389  * EnsureReferencingTableNotReplicated takes referencingTableId for the
390  * referencing table of the foreign key and errors out if it's not a single
391  * replicated table.
392  */
393 static void
EnsureReferencingTableNotReplicated(Oid referencingTableId)394 EnsureReferencingTableNotReplicated(Oid referencingTableId)
395 {
396 	bool referencingNotReplicated = true;
397 	bool referencingIsCitus = IsCitusTable(referencingTableId);
398 
399 	if (referencingIsCitus)
400 	{
401 		/* ALTER TABLE command is applied over single replicated table */
402 		referencingNotReplicated = SingleReplicatedTable(referencingTableId);
403 	}
404 	else
405 	{
406 		/* Creating single replicated table with foreign constraint */
407 		referencingNotReplicated = !DistributedTableReplicationIsEnabled();
408 	}
409 
410 	if (!referencingNotReplicated)
411 	{
412 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 						errmsg("cannot create foreign key constraint"),
414 						errdetail("Citus Community Edition currently supports "
415 								  "foreign key constraints only for "
416 								  "\"citus.shard_replication_factor = 1\"."),
417 						errhint("Please change \"citus.shard_replication_factor to "
418 								"1\". To learn more about using foreign keys with "
419 								"other replication factors, please contact us at "
420 								"https://citusdata.com/about/contact_us.")));
421 	}
422 }
423 
424 
425 /*
426  * ErrorOutForFKeyBetweenPostgresAndCitusLocalTable is a helper function to
427  * error out for foreign keys between postgres local tables and citus local
428  * tables.
429  */
430 void
ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId)431 ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId)
432 {
433 	char *localTableName = get_rel_name(localTableId);
434 	ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
435 					errmsg("cannot create foreign key constraint as \"%s\" is "
436 						   "a postgres local table", localTableName),
437 					errhint("first add local table to citus metadata "
438 							"by using SELECT citus_add_local_table_to_metadata('%s') "
439 							"and execute the ALTER TABLE command to create the "
440 							"foreign key to local table", localTableName)));
441 }
442 
443 
444 /*
445  * ForeignConstraintFindDistKeys finds the index of the given distribution columns
446  * in the given foreign key constraint and returns them in referencingAttrIndex
447  * and referencedAttrIndex. If one of them is not found, it returns -1 instead.
448  */
449 static void
ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,Var * referencingDistColumn,Var * referencedDistColumn,int * referencingAttrIndex,int * referencedAttrIndex)450 ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
451 							  Var *referencingDistColumn,
452 							  Var *referencedDistColumn,
453 							  int *referencingAttrIndex,
454 							  int *referencedAttrIndex)
455 {
456 	Datum *referencingColumnArray = NULL;
457 	int referencingColumnCount = 0;
458 	Datum *referencedColumnArray = NULL;
459 	int referencedColumnCount = 0;
460 	bool isNull = false;
461 
462 	*referencedAttrIndex = -1;
463 	*referencedAttrIndex = -1;
464 
465 	/*
466 	 * Column attributes are not available in Form_pg_constraint, therefore we need
467 	 * to find them in the system catalog. After finding them, we iterate over column
468 	 * attributes together because partition column must be at the same place in both
469 	 * referencing and referenced side of the foreign key constraint.
470 	 */
471 	Datum referencingColumnsDatum = SysCacheGetAttr(CONSTROID, pgConstraintTuple,
472 													Anum_pg_constraint_conkey, &isNull);
473 	Datum referencedColumnsDatum = SysCacheGetAttr(CONSTROID, pgConstraintTuple,
474 												   Anum_pg_constraint_confkey, &isNull);
475 
476 	deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true,
477 					  's', &referencingColumnArray, NULL, &referencingColumnCount);
478 	deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true,
479 					  's', &referencedColumnArray, NULL, &referencedColumnCount);
480 
481 	Assert(referencingColumnCount == referencedColumnCount);
482 
483 	for (int attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx)
484 	{
485 		AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]);
486 		AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]);
487 
488 		if (referencedDistColumn != NULL &&
489 			referencedDistColumn->varattno == referencedAttrNo)
490 		{
491 			*referencedAttrIndex = attrIdx;
492 		}
493 
494 		if (referencingDistColumn != NULL &&
495 			referencingDistColumn->varattno == referencingAttrNo)
496 		{
497 			*referencingAttrIndex = attrIdx;
498 		}
499 	}
500 }
501 
502 
503 /*
504  * ColumnAppearsInForeignKey returns true if there is a foreign key constraint
505  * from/to given column. False otherwise.
506  */
507 bool
ColumnAppearsInForeignKey(char * columnName,Oid relationId)508 ColumnAppearsInForeignKey(char *columnName, Oid relationId)
509 {
510 	int searchForeignKeyColumnFlags = SEARCH_REFERENCING_RELATION |
511 									  SEARCH_REFERENCED_RELATION;
512 	List *foreignKeysColumnAppeared =
513 		GetForeignKeyIdsForColumn(columnName, relationId, searchForeignKeyColumnFlags);
514 	return list_length(foreignKeysColumnAppeared) > 0;
515 }
516 
517 
518 /*
519  * ColumnAppearsInForeignKeyToReferenceTable checks if there is a foreign key
520  * constraint from/to any reference table on the given column.
521  */
522 bool
ColumnAppearsInForeignKeyToReferenceTable(char * columnName,Oid relationId)523 ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
524 {
525 	int searchForeignKeyColumnFlags = SEARCH_REFERENCING_RELATION |
526 									  SEARCH_REFERENCED_RELATION;
527 	List *foreignKeyIdsColumnAppeared =
528 		GetForeignKeyIdsForColumn(columnName, relationId, searchForeignKeyColumnFlags);
529 
530 	Oid foreignKeyId = InvalidOid;
531 	foreach_oid(foreignKeyId, foreignKeyIdsColumnAppeared)
532 	{
533 		Oid referencedTableId = GetReferencedTableId(foreignKeyId);
534 		if (IsCitusTableType(referencedTableId, REFERENCE_TABLE))
535 		{
536 			return true;
537 		}
538 	}
539 
540 	return false;
541 }
542 
543 
544 /*
545  * GetForeignKeyIdsForColumn takes columnName and relationId for the owning
546  * relation, and returns a list of OIDs for foreign constraints that the column
547  * with columnName is involved according to "searchForeignKeyColumnFlags" argument.
548  * See SearchForeignKeyColumnFlags enum definition for usage.
549  */
550 static List *
GetForeignKeyIdsForColumn(char * columnName,Oid relationId,int searchForeignKeyColumnFlags)551 GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
552 						  int searchForeignKeyColumnFlags)
553 {
554 	bool searchReferencing = searchForeignKeyColumnFlags & SEARCH_REFERENCING_RELATION;
555 	bool searchReferenced = searchForeignKeyColumnFlags & SEARCH_REFERENCED_RELATION;
556 
557 	/* at least one of them should be true */
558 	Assert(searchReferencing || searchReferenced);
559 
560 	List *foreignKeyIdsColumnAppeared = NIL;
561 
562 	ScanKeyData scanKey[1];
563 	int scanKeyCount = 1;
564 
565 	Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
566 
567 	ScanKeyInit(&scanKey[0], Anum_pg_constraint_contype, BTEqualStrategyNumber,
568 				F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
569 
570 	SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, false,
571 													NULL, scanKeyCount, scanKey);
572 
573 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
574 	while (HeapTupleIsValid(heapTuple))
575 	{
576 		int pgConstraintKey = 0;
577 		Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
578 
579 		Oid referencedTableId = constraintForm->confrelid;
580 		Oid referencingTableId = constraintForm->conrelid;
581 
582 		if (referencedTableId == relationId && searchReferenced)
583 		{
584 			pgConstraintKey = Anum_pg_constraint_confkey;
585 		}
586 		else if (referencingTableId == relationId && searchReferencing)
587 		{
588 			pgConstraintKey = Anum_pg_constraint_conkey;
589 		}
590 		else
591 		{
592 			/*
593 			 * If the constraint is not from/to the given relation, we should simply
594 			 * skip.
595 			 */
596 			heapTuple = systable_getnext(scanDescriptor);
597 			continue;
598 		}
599 
600 		if (HeapTupleOfForeignConstraintIncludesColumn(heapTuple, relationId,
601 													   pgConstraintKey, columnName))
602 		{
603 			Oid foreignKeyOid = get_relation_constraint_oid_compat(heapTuple);
604 			foreignKeyIdsColumnAppeared = lappend_oid(foreignKeyIdsColumnAppeared,
605 													  foreignKeyOid);
606 		}
607 
608 		heapTuple = systable_getnext(scanDescriptor);
609 	}
610 
611 	/* clean up scan and close system catalog */
612 	systable_endscan(scanDescriptor);
613 	table_close(pgConstraint, NoLock);
614 
615 	return foreignKeyIdsColumnAppeared;
616 }
617 
618 
619 /*
620  * GetReferencingForeignConstaintCommands takes in a relationId, and
621  * returns the list of foreign constraint commands needed to reconstruct
622  * foreign key constraints that the table is involved in as the "referencing"
623  * one.
624  */
625 List *
GetReferencingForeignConstaintCommands(Oid relationId)626 GetReferencingForeignConstaintCommands(Oid relationId)
627 {
628 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
629 	return GetForeignConstraintCommandsInternal(relationId, flags);
630 }
631 
632 
633 /*
634  * GetForeignConstraintToReferenceTablesCommands takes in a relationId, and
635  * returns the list of foreign constraint commands needed to reconstruct
636  * foreign key constraints that the table is involved in as the "referencing"
637  * one and the "referenced" table is a reference table.
638  */
639 List *
GetForeignConstraintToReferenceTablesCommands(Oid relationId)640 GetForeignConstraintToReferenceTablesCommands(Oid relationId)
641 {
642 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
643 	return GetForeignConstraintCommandsInternal(relationId, flags);
644 }
645 
646 
647 /*
648  * GetForeignConstraintToDistributedTablesCommands takes in a relationId, and
649  * returns the list of foreign constraint commands needed to reconstruct
650  * foreign key constraints that the table is involved in as the "referencing"
651  * one and the "referenced" table is a distributed table.
652  */
653 List *
GetForeignConstraintToDistributedTablesCommands(Oid relationId)654 GetForeignConstraintToDistributedTablesCommands(Oid relationId)
655 {
656 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_DISTRIBUTED_TABLES;
657 	return GetForeignConstraintCommandsInternal(relationId, flags);
658 }
659 
660 
661 /*
662  * GetForeignConstraintFromDistributedTablesCommands takes in a relationId, and
663  * returns the list of foreign constraint commands needed to reconstruct
664  * foreign key constraints that the table is involved in as the "referenced"
665  * one and the "referencing" table is a distributed table.
666  */
667 List *
GetForeignConstraintFromDistributedTablesCommands(Oid relationId)668 GetForeignConstraintFromDistributedTablesCommands(Oid relationId)
669 {
670 	int flags = INCLUDE_REFERENCED_CONSTRAINTS | INCLUDE_DISTRIBUTED_TABLES;
671 	return GetForeignConstraintCommandsInternal(relationId, flags);
672 }
673 
674 
675 /*
676  * GetForeignConstraintCommandsInternal is a wrapper function to get the
677  * DDL commands to recreate the foreign key constraints returned by
678  * GetForeignKeyOids. See more details at the underlying function.
679  */
680 List *
GetForeignConstraintCommandsInternal(Oid relationId,int flags)681 GetForeignConstraintCommandsInternal(Oid relationId, int flags)
682 {
683 	List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
684 
685 	List *foreignKeyCommands = NIL;
686 
687 	PushOverrideEmptySearchPath(CurrentMemoryContext);
688 
689 	Oid foreignKeyOid = InvalidOid;
690 	foreach_oid(foreignKeyOid, foreignKeyOids)
691 	{
692 		char *statementDef = pg_get_constraintdef_command(foreignKeyOid);
693 
694 		foreignKeyCommands = lappend(foreignKeyCommands, statementDef);
695 	}
696 
697 	/* revert back to original search_path */
698 	PopOverrideSearchPath();
699 
700 	return foreignKeyCommands;
701 }
702 
703 
704 /*
705  * get_relation_constraint_oid_compat returns OID of the constraint represented
706  * by the constraintForm, which is passed as an heapTuple. OID of the contraint
707  * is already stored in the constraintForm struct if major PostgreSQL version is
708  * 12. However, in the older versions, we should utilize HeapTupleGetOid to deduce
709  * that OID with no cost.
710  */
711 static Oid
get_relation_constraint_oid_compat(HeapTuple heapTuple)712 get_relation_constraint_oid_compat(HeapTuple heapTuple)
713 {
714 	Assert(heapTuple != NULL);
715 
716 
717 	Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
718 	Oid constraintOid = constraintForm->oid;
719 
720 	return constraintOid;
721 }
722 
723 
724 /*
725  * HasForeignKeyToLocalTable returns true if relation has foreign key
726  * relationship with a local table.
727  */
728 bool
HasForeignKeyWithLocalTable(Oid relationId)729 HasForeignKeyWithLocalTable(Oid relationId)
730 {
731 	List *foreignKeysWithLocalTables = GetForeignKeysWithLocalTables(relationId);
732 	return list_length(foreignKeysWithLocalTables) > 0;
733 }
734 
735 
736 /*
737  * GetForeignKeysWithLocalTables returns a list of foreign keys for foreign key
738  * relationships that relation has with local tables.
739  */
740 static List *
GetForeignKeysWithLocalTables(Oid relationId)741 GetForeignKeysWithLocalTables(Oid relationId)
742 {
743 	int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
744 							   INCLUDE_LOCAL_TABLES;
745 	List *referencingFKeyList = GetForeignKeyOids(relationId, referencingFKeysFlag);
746 
747 	/* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
748 	int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
749 							  EXCLUDE_SELF_REFERENCES |
750 							  INCLUDE_LOCAL_TABLES;
751 	List *referencedFKeyList = GetForeignKeyOids(relationId, referencedFKeysFlag);
752 	return list_concat(referencingFKeyList, referencedFKeyList);
753 }
754 
755 
756 /*
757  * GetForeignKeysFromLocalTables returns a list of foreign keys where the referencing
758  * relation is a local table.
759  */
760 List *
GetForeignKeysFromLocalTables(Oid relationId)761 GetForeignKeysFromLocalTables(Oid relationId)
762 {
763 	int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
764 							  INCLUDE_LOCAL_TABLES;
765 	List *referencingFKeyList = GetForeignKeyOids(relationId, referencedFKeysFlag);
766 
767 	return referencingFKeyList;
768 }
769 
770 
771 /*
772  * HasForeignKeyToCitusLocalTable returns true if any of the foreign key constraints
773  * on the relation with relationId references to a citus local table.
774  */
775 bool
HasForeignKeyToCitusLocalTable(Oid relationId)776 HasForeignKeyToCitusLocalTable(Oid relationId)
777 {
778 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_CITUS_LOCAL_TABLES;
779 	List *foreignKeyOidList = GetForeignKeyOids(relationId, flags);
780 	return list_length(foreignKeyOidList) > 0;
781 }
782 
783 
784 /*
785  * HasForeignKeyToReferenceTable returns true if any of the foreign key
786  * constraints on the relation with relationId references to a reference
787  * table.
788  */
789 bool
HasForeignKeyToReferenceTable(Oid relationId)790 HasForeignKeyToReferenceTable(Oid relationId)
791 {
792 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
793 	List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
794 
795 	return list_length(foreignKeyOids) > 0;
796 }
797 
798 
799 /*
800  * TableReferenced function checks whether given table is referenced by another table
801  * via foreign constraints. If it is referenced, this function returns true.
802  */
803 bool
TableReferenced(Oid relationId)804 TableReferenced(Oid relationId)
805 {
806 	int flags = INCLUDE_REFERENCED_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
807 	List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
808 
809 	return list_length(foreignKeyOids) > 0;
810 }
811 
812 
813 /*
814  * HeapTupleOfForeignConstraintIncludesColumn fetches the columns from the foreign
815  * constraint and checks if the given column name matches one of them.
816  */
817 static bool
HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple,Oid relationId,int pgConstraintKey,char * columnName)818 HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid relationId,
819 										   int pgConstraintKey, char *columnName)
820 {
821 	Datum *columnArray = NULL;
822 	int columnCount = 0;
823 	bool isNull = false;
824 
825 	Datum columnsDatum = SysCacheGetAttr(CONSTROID, heapTuple, pgConstraintKey, &isNull);
826 	deconstruct_array(DatumGetArrayTypeP(columnsDatum), INT2OID, 2, true,
827 					  's', &columnArray, NULL, &columnCount);
828 
829 	for (int attrIdx = 0; attrIdx < columnCount; ++attrIdx)
830 	{
831 		AttrNumber attrNo = DatumGetInt16(columnArray[attrIdx]);
832 
833 		char *colName = get_attname(relationId, attrNo, false);
834 		if (strncmp(colName, columnName, NAMEDATALEN) == 0)
835 		{
836 			return true;
837 		}
838 	}
839 
840 	return false;
841 }
842 
843 
844 /*
845  * TableReferencing function checks whether given table is referencing to another
846  * table via foreign key constraints. If it is referencing, this function returns
847  * true.
848  */
849 bool
TableReferencing(Oid relationId)850 TableReferencing(Oid relationId)
851 {
852 	int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
853 	List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
854 
855 	return list_length(foreignKeyOids) > 0;
856 }
857 
858 
859 /*
860  * ConstraintWithNameIsOfType is a wrapper around ConstraintWithNameIsOfType that returns true
861  * if given constraint name identifies a uniqueness constraint, i.e:
862  *   - primary key constraint, or
863  *   - unique constraint
864  */
865 bool
ConstraintIsAUniquenessConstraint(char * inputConstaintName,Oid relationId)866 ConstraintIsAUniquenessConstraint(char *inputConstaintName, Oid relationId)
867 {
868 	bool isUniqueConstraint = ConstraintWithNameIsOfType(inputConstaintName, relationId,
869 														 CONSTRAINT_UNIQUE);
870 	bool isPrimaryConstraint = ConstraintWithNameIsOfType(inputConstaintName, relationId,
871 														  CONSTRAINT_PRIMARY);
872 	return isUniqueConstraint || isPrimaryConstraint;
873 }
874 
875 
876 /*
877  * ConstraintIsAForeignKey is a wrapper around ConstraintWithNameIsOfType that returns true
878  * if given constraint name identifies a foreign key constraint.
879  */
880 bool
ConstraintIsAForeignKey(char * inputConstaintName,Oid relationId)881 ConstraintIsAForeignKey(char *inputConstaintName, Oid relationId)
882 {
883 	return ConstraintWithNameIsOfType(inputConstaintName, relationId, CONSTRAINT_FOREIGN);
884 }
885 
886 
887 /*
888  * ConstraintWithNameIsOfType is a wrapper around get_relation_constraint_oid that
889  * returns true if given constraint name identifies a valid constraint defined
890  * on relation with relationId and it's type matches the input constraint type.
891  */
892 bool
ConstraintWithNameIsOfType(char * inputConstaintName,Oid relationId,char targetConstraintType)893 ConstraintWithNameIsOfType(char *inputConstaintName, Oid relationId,
894 						   char targetConstraintType)
895 {
896 	bool missingOk = true;
897 	Oid constraintId =
898 		get_relation_constraint_oid(relationId, inputConstaintName, missingOk);
899 	return ConstraintWithIdIsOfType(constraintId, targetConstraintType);
900 }
901 
902 
903 /*
904  * ConstraintWithIdIsOfType returns true if constraint with constraintId exists
905  * and is of type targetConstraintType.
906  */
907 bool
ConstraintWithIdIsOfType(Oid constraintId,char targetConstraintType)908 ConstraintWithIdIsOfType(Oid constraintId, char targetConstraintType)
909 {
910 	HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintId));
911 	if (!HeapTupleIsValid(heapTuple))
912 	{
913 		/* no such constraint */
914 		return false;
915 	}
916 
917 	Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
918 	char constraintType = constraintForm->contype;
919 	bool constraintTypeMatches = (constraintType == targetConstraintType);
920 
921 	ReleaseSysCache(heapTuple);
922 
923 	return constraintTypeMatches;
924 }
925 
926 
927 /*
928  * FindForeignKeyOidWithName searches the foreign key constraint with
929  * inputConstraintName in the given list of foreign key constraint OIDs.
930  * Returns the OID of the matching constraint. If there no matching constraint
931  * in the given list, then returns InvalidOid.
932  */
933 static Oid
FindForeignKeyOidWithName(List * foreignKeyOids,const char * inputConstraintName)934 FindForeignKeyOidWithName(List *foreignKeyOids, const char *inputConstraintName)
935 {
936 	Oid foreignKeyOid = InvalidOid;
937 	foreach_oid(foreignKeyOid, foreignKeyOids)
938 	{
939 		char *constraintName = get_constraint_name(foreignKeyOid);
940 
941 		Assert(constraintName != NULL);
942 
943 		if (strncmp(constraintName, inputConstraintName, NAMEDATALEN) == 0)
944 		{
945 			return foreignKeyOid;
946 		}
947 	}
948 
949 	return InvalidOid;
950 }
951 
952 
953 /*
954  * TableHasExternalForeignKeys returns true if the relation with relationId is
955  * involved in a foreign key relationship other than the self-referencing ones.
956  */
957 bool
TableHasExternalForeignKeys(Oid relationId)958 TableHasExternalForeignKeys(Oid relationId)
959 {
960 	int flags = (INCLUDE_REFERENCING_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
961 				 INCLUDE_ALL_TABLE_TYPES);
962 	List *foreignKeyIdsTableReferencing = GetForeignKeyOids(relationId, flags);
963 
964 	flags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
965 			 INCLUDE_ALL_TABLE_TYPES);
966 	List *foreignKeyIdsTableReferenced = GetForeignKeyOids(relationId, flags);
967 
968 	List *foreignKeysWithOtherTables = list_concat(foreignKeyIdsTableReferencing,
969 												   foreignKeyIdsTableReferenced);
970 
971 	if (list_length(foreignKeysWithOtherTables) == 0)
972 	{
973 		return false;
974 	}
975 
976 	return true;
977 }
978 
979 
980 /*
981  * GetForeignKeyOids takes in a relationId, and returns a list of OIDs for
982  * foreign constraints that the relation with relationId is involved according
983  * to "flags" argument. See ExtractForeignKeyConstraintsMode enum definition
984  * for usage of the flags.
985  */
986 List *
GetForeignKeyOids(Oid relationId,int flags)987 GetForeignKeyOids(Oid relationId, int flags)
988 {
989 	AttrNumber pgConstraintTargetAttrNumber = InvalidAttrNumber;
990 
991 	bool extractReferencing = (flags & INCLUDE_REFERENCING_CONSTRAINTS);
992 	bool extractReferenced = (flags & INCLUDE_REFERENCED_CONSTRAINTS);
993 
994 	/*
995 	 * Only one of them should be passed at a time since the way we scan
996 	 * pg_constraint differs for those columns. Anum_pg_constraint_conrelid
997 	 * supports index scan while Anum_pg_constraint_confrelid does not.
998 	 */
999 	Assert(!(extractReferencing && extractReferenced));
1000 	Assert(extractReferencing || extractReferenced);
1001 
1002 	bool useIndex = false;
1003 	Oid indexOid = InvalidOid;
1004 
1005 	if (extractReferencing)
1006 	{
1007 		pgConstraintTargetAttrNumber = Anum_pg_constraint_conrelid;
1008 
1009 		useIndex = true;
1010 		indexOid = ConstraintRelidTypidNameIndexId;
1011 	}
1012 	else if (extractReferenced)
1013 	{
1014 		pgConstraintTargetAttrNumber = Anum_pg_constraint_confrelid;
1015 	}
1016 
1017 	bool excludeSelfReference = (flags & EXCLUDE_SELF_REFERENCES);
1018 
1019 	List *foreignKeyOids = NIL;
1020 
1021 	ScanKeyData scanKey[1];
1022 	int scanKeyCount = 1;
1023 
1024 	Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
1025 	ScanKeyInit(&scanKey[0], pgConstraintTargetAttrNumber,
1026 				BTEqualStrategyNumber, F_OIDEQ, relationId);
1027 	SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, indexOid, useIndex,
1028 													NULL, scanKeyCount, scanKey);
1029 
1030 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
1031 	while (HeapTupleIsValid(heapTuple))
1032 	{
1033 		Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1034 
1035 		if (constraintForm->contype != CONSTRAINT_FOREIGN)
1036 		{
1037 			heapTuple = systable_getnext(scanDescriptor);
1038 			continue;
1039 		}
1040 
1041 		bool inheritedConstraint = OidIsValid(constraintForm->conparentid);
1042 		if (inheritedConstraint)
1043 		{
1044 			/*
1045 			 * We only consider the constraints that are explicitly created on
1046 			 * the table as we already process the constraints from parent tables
1047 			 * implicitly when a command is issued
1048 			 */
1049 			heapTuple = systable_getnext(scanDescriptor);
1050 			continue;
1051 		}
1052 
1053 		Oid constraintId = get_relation_constraint_oid_compat(heapTuple);
1054 
1055 		bool isSelfReference = (constraintForm->conrelid == constraintForm->confrelid);
1056 		if (excludeSelfReference && isSelfReference)
1057 		{
1058 			heapTuple = systable_getnext(scanDescriptor);
1059 			continue;
1060 		}
1061 
1062 		Oid otherTableId = InvalidOid;
1063 		if (extractReferencing)
1064 		{
1065 			otherTableId = constraintForm->confrelid;
1066 		}
1067 		else if (extractReferenced)
1068 		{
1069 			otherTableId = constraintForm->conrelid;
1070 		}
1071 
1072 		if (!IsTableTypeIncluded(otherTableId, flags))
1073 		{
1074 			heapTuple = systable_getnext(scanDescriptor);
1075 			continue;
1076 		}
1077 
1078 		foreignKeyOids = lappend_oid(foreignKeyOids, constraintId);
1079 
1080 		heapTuple = systable_getnext(scanDescriptor);
1081 	}
1082 
1083 	systable_endscan(scanDescriptor);
1084 
1085 	/*
1086 	 * Do not release AccessShareLock yet to prevent modifications to be done
1087 	 * on pg_constraint to make sure that caller will process valid foreign key
1088 	 * constraints through the transaction.
1089 	 */
1090 	table_close(pgConstraint, NoLock);
1091 
1092 	return foreignKeyOids;
1093 }
1094 
1095 
1096 /*
1097  * GetReferencedTableId returns OID of the referenced relation for the foreign
1098  * key with foreignKeyId. If there is no such foreign key, then this function
1099  * returns InvalidOid.
1100  */
1101 Oid
GetReferencedTableId(Oid foreignKeyId)1102 GetReferencedTableId(Oid foreignKeyId)
1103 {
1104 	HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyId));
1105 	if (!HeapTupleIsValid(heapTuple))
1106 	{
1107 		/* no such foreign key */
1108 		return InvalidOid;
1109 	}
1110 
1111 	Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1112 	Oid referencedTableId = constraintForm->confrelid;
1113 
1114 	ReleaseSysCache(heapTuple);
1115 
1116 	return referencedTableId;
1117 }
1118 
1119 
1120 /*
1121  * GetReferencingTableId returns OID of the referencing relation for the foreign
1122  * key with foreignKeyId. If there is no such foreign key, then this function
1123  * returns InvalidOid.
1124  */
1125 Oid
GetReferencingTableId(Oid foreignKeyId)1126 GetReferencingTableId(Oid foreignKeyId)
1127 {
1128 	HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyId));
1129 	if (!HeapTupleIsValid(heapTuple))
1130 	{
1131 		/* no such foreign key */
1132 		return InvalidOid;
1133 	}
1134 
1135 	Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1136 	Oid referencingTableId = constraintForm->conrelid;
1137 
1138 	ReleaseSysCache(heapTuple);
1139 
1140 	return referencingTableId;
1141 }
1142 
1143 
1144 /*
1145  * IsTableTypeIncluded returns true if type of the table with relationId (distributed,
1146  * reference, Citus local or Postgres local) is included in the flags, false if not
1147  */
1148 static bool
IsTableTypeIncluded(Oid relationId,int flags)1149 IsTableTypeIncluded(Oid relationId, int flags)
1150 {
1151 	if (!IsCitusTable(relationId))
1152 	{
1153 		return (flags & INCLUDE_LOCAL_TABLES) != 0;
1154 	}
1155 	else if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
1156 	{
1157 		return (flags & INCLUDE_DISTRIBUTED_TABLES) != 0;
1158 	}
1159 	else if (IsCitusTableType(relationId, REFERENCE_TABLE))
1160 	{
1161 		return (flags & INCLUDE_REFERENCE_TABLES) != 0;
1162 	}
1163 	else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
1164 	{
1165 		return (flags & INCLUDE_CITUS_LOCAL_TABLES) != 0;
1166 	}
1167 	return false;
1168 }
1169 
1170 
1171 /*
1172  * GetForeignConstraintCommandsToReferenceTable takes in a shardInterval, and
1173  * returns the list of commands that are required to create the foreign
1174  * constraints for that shardInterval.
1175  *
1176  * The function does the following hack:
1177  *    - Create the foreign constraints as INVALID on the shards
1178  *    - Manually update pg_constraint to mark the same foreign
1179  *      constraints as VALID
1180  *
1181  * We implement the above hack because we aim to skip the validation phase
1182  * of foreign keys to reference tables. The validation is pretty costly and
1183  * given that the source placements already valid, the validation in the
1184  * target nodes is useless.
1185  *
1186  * The function does not apply the same logic for the already invalid foreign
1187  * constraints.
1188  */
1189 List *
GetForeignConstraintCommandsToReferenceTable(ShardInterval * shardInterval)1190 GetForeignConstraintCommandsToReferenceTable(ShardInterval *shardInterval)
1191 {
1192 	ScanKeyData scanKey[1];
1193 	int scanKeyCount = 1;
1194 	uint64 shardId = shardInterval->shardId;
1195 	Oid relationId = shardInterval->relationId;
1196 
1197 	List *commandList = NIL;
1198 
1199 	/*
1200 	 * Set search_path to NIL so that all objects outside of pg_catalog will be
1201 	 * schema-prefixed. pg_catalog will be added automatically when we call
1202 	 * PushOverrideSearchPath(), since we set addCatalog to true;
1203 	 */
1204 	OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
1205 	overridePath->schemas = NIL;
1206 	overridePath->addCatalog = true;
1207 	PushOverrideSearchPath(overridePath);
1208 
1209 	/* open system catalog and scan all constraints that belong to this table */
1210 	Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
1211 	ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
1212 				relationId);
1213 
1214 	SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
1215 													ConstraintRelidTypidNameIndexId,
1216 													true, NULL, scanKeyCount, scanKey);
1217 
1218 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
1219 	while (HeapTupleIsValid(heapTuple))
1220 	{
1221 		Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1222 		char *constraintDefinition = NULL;
1223 
1224 
1225 		if (constraintForm->contype != CONSTRAINT_FOREIGN)
1226 		{
1227 			heapTuple = systable_getnext(scanDescriptor);
1228 			continue;
1229 		}
1230 
1231 		Oid referencedRelationId = constraintForm->confrelid;
1232 		if (PartitionMethod(referencedRelationId) != DISTRIBUTE_BY_NONE)
1233 		{
1234 			heapTuple = systable_getnext(scanDescriptor);
1235 			continue;
1236 		}
1237 
1238 		Oid constraintId = get_relation_constraint_oid(relationId,
1239 													   constraintForm->conname.data,
1240 													   true);
1241 
1242 		int64 referencedShardId = GetFirstShardId(referencedRelationId);
1243 		Oid referencedSchemaId = get_rel_namespace(referencedRelationId);
1244 		char *referencedSchemaName = get_namespace_name(referencedSchemaId);
1245 		char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
1246 
1247 		Oid schemaId = get_rel_namespace(relationId);
1248 		char *schemaName = get_namespace_name(schemaId);
1249 		char *escapedSchemaName = quote_literal_cstr(schemaName);
1250 
1251 		/*
1252 		 * We're first marking the constraint's valid field as invalid
1253 		 * and get the constraint definition. Later, we mark the constraint
1254 		 * as valid back with directly updating to pg_constraint.
1255 		 */
1256 		if (constraintForm->convalidated == true)
1257 		{
1258 			UpdateConstraintIsValid(constraintId, false);
1259 			constraintDefinition = pg_get_constraintdef_command(constraintId);
1260 			UpdateConstraintIsValid(constraintId, true);
1261 		}
1262 		else
1263 		{
1264 			/* if the constraint is not valid, simply do nothing special */
1265 			constraintDefinition = pg_get_constraintdef_command(constraintId);
1266 		}
1267 
1268 		StringInfo applyForeignConstraintCommand = makeStringInfo();
1269 		appendStringInfo(applyForeignConstraintCommand,
1270 						 WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId,
1271 						 escapedSchemaName, referencedShardId,
1272 						 escapedReferencedSchemaName,
1273 						 quote_literal_cstr(constraintDefinition));
1274 		commandList = lappend(commandList, applyForeignConstraintCommand->data);
1275 
1276 		/* mark the constraint as valid again on the shard */
1277 		if (constraintForm->convalidated == true)
1278 		{
1279 			StringInfo markConstraintValid = makeStringInfo();
1280 			char *qualifiedReferencingShardName =
1281 				ConstructQualifiedShardName(shardInterval);
1282 
1283 			char *shardConstraintName = pstrdup(constraintForm->conname.data);
1284 			AppendShardIdToName(&shardConstraintName, shardId);
1285 
1286 			appendStringInfo(markConstraintValid,
1287 							 "UPDATE pg_constraint SET convalidated = true WHERE "
1288 							 "conrelid = %s::regclass AND conname = '%s'",
1289 							 quote_literal_cstr(qualifiedReferencingShardName),
1290 							 shardConstraintName);
1291 			commandList = lappend(commandList, markConstraintValid->data);
1292 		}
1293 
1294 		heapTuple = systable_getnext(scanDescriptor);
1295 	}
1296 
1297 	/* clean up scan and close system catalog */
1298 	systable_endscan(scanDescriptor);
1299 	table_close(pgConstraint, AccessShareLock);
1300 
1301 	/* revert back to original search_path */
1302 	PopOverrideSearchPath();
1303 
1304 	return commandList;
1305 }
1306 
1307 
1308 /*
1309  * UpdateConstraintIsValid is a utility function with sets the
1310  * pg_constraint.convalidated to the given isValid for the given
1311  * constraintId.
1312  *
1313  * This function should be called with caution because if used wrong
1314  * could lead to data inconsistencies.
1315  */
1316 static void
UpdateConstraintIsValid(Oid constraintId,bool isValid)1317 UpdateConstraintIsValid(Oid constraintId, bool isValid)
1318 {
1319 	ScanKeyData scankey[1];
1320 	Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
1321 	TupleDesc tupleDescriptor = RelationGetDescr(pgConstraint);
1322 	Datum values[Natts_pg_constraint];
1323 	bool isnull[Natts_pg_constraint];
1324 	bool replace[Natts_pg_constraint];
1325 
1326 	ScanKeyInit(&scankey[0],
1327 				Anum_pg_constraint_oid,
1328 				BTEqualStrategyNumber, F_OIDEQ,
1329 				ObjectIdGetDatum(constraintId));
1330 
1331 	SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
1332 													ConstraintOidIndexId,
1333 													true,
1334 													NULL,
1335 													1,
1336 													scankey);
1337 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
1338 	if (!HeapTupleIsValid(heapTuple))
1339 	{
1340 		elog(ERROR, "could not find tuple for constraint %u", constraintId);
1341 	}
1342 
1343 	memset(replace, 0, sizeof(replace));
1344 
1345 	values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValid);
1346 	isnull[Anum_pg_constraint_convalidated - 1] = false;
1347 	replace[Anum_pg_constraint_convalidated - 1] = true;
1348 
1349 	heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
1350 
1351 	CatalogTupleUpdate(pgConstraint, &heapTuple->t_self, heapTuple);
1352 
1353 	CacheInvalidateHeapTuple(pgConstraint, heapTuple, NULL);
1354 	CommandCounterIncrement();
1355 
1356 	systable_endscan(scanDescriptor);
1357 	table_close(pgConstraint, NoLock);
1358 }
1359 
1360 
1361 /*
1362  * RelationInvolvedInAnyNonInheritedForeignKeys returns true if relation involved
1363  * in a foreign key that is not inherited from its parent relation.
1364  */
1365 bool
RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId)1366 RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId)
1367 {
1368 	List *referencingForeignKeys = GetForeignKeyOids(relationId,
1369 													 INCLUDE_REFERENCING_CONSTRAINTS |
1370 													 INCLUDE_ALL_TABLE_TYPES);
1371 
1372 	/*
1373 	 * We already capture self-referencing foreign keys above, so use
1374 	 * EXCLUDE_SELF_REFERENCES here
1375 	 */
1376 	List *referencedForeignKeys = GetForeignKeyOids(relationId,
1377 													INCLUDE_REFERENCED_CONSTRAINTS |
1378 													EXCLUDE_SELF_REFERENCES |
1379 													INCLUDE_ALL_TABLE_TYPES);
1380 	List *foreignKeysRelationInvolved = list_concat(referencingForeignKeys,
1381 													referencedForeignKeys);
1382 	Oid foreignKeyId = InvalidOid;
1383 	foreach_oid(foreignKeyId, foreignKeysRelationInvolved)
1384 	{
1385 		HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyId));
1386 		if (!HeapTupleIsValid(heapTuple))
1387 		{
1388 			/* not possible but be on the safe side */
1389 			continue;
1390 		}
1391 
1392 		Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
1393 		Oid parentConstraintId = constraintForm->conparentid;
1394 		if (!OidIsValid(parentConstraintId))
1395 		{
1396 			return true;
1397 		}
1398 	}
1399 
1400 	return false;
1401 }
1402