1 /*-------------------------------------------------------------------------
2  *
3  * create_distributed_table.c
4  *	  Routines relation to the creation of distributed relations.
5  *
6  * Copyright (c) Citus Data, Inc.
7  *
8  *-------------------------------------------------------------------------
9  */
10 
11 #include "postgres.h"
12 #include "miscadmin.h"
13 
14 #include "distributed/pg_version_constants.h"
15 #include "distributed/commands/utility_hook.h"
16 
17 #include "access/genam.h"
18 #include "access/hash.h"
19 #include "access/heapam.h"
20 #include "access/htup.h"
21 #include "access/htup_details.h"
22 #include "access/nbtree.h"
23 #include "access/xact.h"
24 #include "catalog/dependency.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "catalog/pg_attribute.h"
28 #include "catalog/pg_enum.h"
29 #include "catalog/pg_extension.h"
30 #include "catalog/pg_namespace.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_proc.h"
33 #include "catalog/pg_trigger.h"
34 #include "commands/defrem.h"
35 #include "commands/extension.h"
36 #include "commands/sequence.h"
37 #include "commands/tablecmds.h"
38 #include "commands/trigger.h"
39 #include "distributed/commands/multi_copy.h"
40 #include "distributed/citus_ruleutils.h"
41 #include "distributed/colocation_utils.h"
42 #include "distributed/commands.h"
43 #include "distributed/deparser.h"
44 #include "distributed/distribution_column.h"
45 #include "distributed/listutils.h"
46 #include "distributed/local_executor.h"
47 #include "distributed/metadata_utility.h"
48 #include "distributed/coordinator_protocol.h"
49 #include "distributed/metadata/dependency.h"
50 #include "distributed/metadata/distobject.h"
51 #include "distributed/metadata_cache.h"
52 #include "distributed/metadata_sync.h"
53 #include "distributed/multi_executor.h"
54 #include "distributed/multi_logical_planner.h"
55 #include "distributed/multi_partitioning_utils.h"
56 #include "distributed/pg_dist_colocation.h"
57 #include "distributed/pg_dist_partition.h"
58 #include "distributed/reference_table_utils.h"
59 #include "distributed/relation_access_tracking.h"
60 #include "distributed/remote_commands.h"
61 #include "distributed/shared_library_init.h"
62 #include "distributed/worker_protocol.h"
63 #include "distributed/worker_transaction.h"
64 #include "distributed/version_compat.h"
65 #include "executor/executor.h"
66 #include "executor/spi.h"
67 #include "nodes/execnodes.h"
68 #include "nodes/makefuncs.h"
69 #include "nodes/nodeFuncs.h"
70 #include "nodes/pg_list.h"
71 #include "parser/parse_expr.h"
72 #include "parser/parse_node.h"
73 #include "parser/parse_relation.h"
74 #include "parser/parser.h"
75 #include "storage/lmgr.h"
76 #include "tcop/pquery.h"
77 #include "tcop/tcopprot.h"
78 #include "utils/builtins.h"
79 #include "utils/lsyscache.h"
80 #include "utils/memutils.h"
81 #include "utils/rel.h"
82 #include "utils/snapmgr.h"
83 #include "utils/syscache.h"
84 #include "utils/inval.h"
85 
86 /*
87  * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
88  */
89 #define LOG_PER_TUPLE_AMOUNT 1000000
90 
91 /* local function forward declarations */
92 static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName,
93 								   bool viaDeprecatedAPI);
94 static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
95 											 Oid colocatedTableId, bool localTableEmpty);
96 static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
97 									  char distributionMethod, char replicationModel,
98 									  int shardCount, bool shardCountIsStrict,
99 									  char *colocateWithTableName,
100 									  bool viaDeprecatedAPI);
101 static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
102 										   char distributionMethod, uint32 colocationId,
103 										   char replicationModel, bool viaDeprecatedAPI);
104 static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
105 										  Oid distributionColumnType,
106 										  Oid sourceRelationId);
107 static void EnsureLocalTableEmpty(Oid relationId);
108 static void EnsureRelationHasNoTriggers(Oid relationId);
109 static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
110 									int16 supportFunctionNumber);
111 static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod,
112 											 bool viaDeprecatedAPI);
113 static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod, bool
114 									viaDeprecatedAPI);
115 static void EnsureCitusTableCanBeCreated(Oid relationOid);
116 static List * GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId,
117 																   int tableTypeFlag);
118 static Oid DropFKeysAndUndistributeTable(Oid relationId);
119 static void DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag);
120 static void CopyLocalDataIntoShards(Oid relationId);
121 static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
122 static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
123 														Var *distributionColumn);
124 static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
125 static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
126 										   DestReceiver *copyDest,
127 										   TupleTableSlot *slot,
128 										   EState *estate);
129 
130 /* exports for SQL callable functions */
131 PG_FUNCTION_INFO_V1(master_create_distributed_table);
132 PG_FUNCTION_INFO_V1(create_distributed_table);
133 PG_FUNCTION_INFO_V1(create_reference_table);
134 
135 
136 /*
137  * master_create_distributed_table accepts a table, distribution column and
138  * method and performs the corresponding catalog changes.
139  *
140  * Note that this UDF is deprecated and cannot create colocated tables, so we
141  * always use INVALID_COLOCATION_ID.
142  */
143 Datum
master_create_distributed_table(PG_FUNCTION_ARGS)144 master_create_distributed_table(PG_FUNCTION_ARGS)
145 {
146 	CheckCitusVersion(ERROR);
147 	Oid relationId = PG_GETARG_OID(0);
148 	text *distributionColumnText = PG_GETARG_TEXT_P(1);
149 	Oid distributionMethodOid = PG_GETARG_OID(2);
150 
151 	EnsureCitusTableCanBeCreated(relationId);
152 
153 	char *colocateWithTableName = NULL;
154 	bool viaDeprecatedAPI = true;
155 
156 	/*
157 	 * Lock target relation with an exclusive lock - there's no way to make
158 	 * sense of this table until we've committed, and we don't want multiple
159 	 * backends manipulating this relation.
160 	 */
161 	Relation relation = try_relation_open(relationId, ExclusiveLock);
162 
163 	if (relation == NULL)
164 	{
165 		ereport(ERROR, (errmsg("could not create distributed table: "
166 							   "relation does not exist")));
167 	}
168 
169 	char *distributionColumnName = text_to_cstring(distributionColumnText);
170 	Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
171 																 distributionColumnName);
172 	Assert(distributionColumn != NULL);
173 	char distributionMethod = LookupDistributionMethod(distributionMethodOid);
174 
175 	CreateDistributedTable(relationId, distributionColumn, distributionMethod,
176 						   ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
177 
178 	relation_close(relation, NoLock);
179 
180 	PG_RETURN_VOID();
181 }
182 
183 
184 /*
185  * create_distributed_table gets a table name, distribution column,
186  * distribution method and colocate_with option, then it creates a
187  * distributed table.
188  */
189 Datum
create_distributed_table(PG_FUNCTION_ARGS)190 create_distributed_table(PG_FUNCTION_ARGS)
191 {
192 	CheckCitusVersion(ERROR);
193 
194 	if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
195 	{
196 		PG_RETURN_VOID();
197 	}
198 	bool viaDeprecatedAPI = false;
199 
200 	Oid relationId = PG_GETARG_OID(0);
201 	text *distributionColumnText = PG_GETARG_TEXT_P(1);
202 	Oid distributionMethodOid = PG_GETARG_OID(2);
203 	text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
204 	char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
205 
206 	bool shardCountIsStrict = false;
207 	int shardCount = ShardCount;
208 	if (!PG_ARGISNULL(4))
209 	{
210 		if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
211 			pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
212 		{
213 			ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
214 								   "and shard_count at the same time")));
215 		}
216 
217 		shardCount = PG_GETARG_INT32(4);
218 
219 		/*
220 		 * if shard_count parameter is given than we have to
221 		 * make sure table has that many shards
222 		 */
223 		shardCountIsStrict = true;
224 	}
225 
226 	EnsureCitusTableCanBeCreated(relationId);
227 
228 	/* enable create_distributed_table on an empty node */
229 	InsertCoordinatorIfClusterEmpty();
230 
231 	/*
232 	 * Lock target relation with an exclusive lock - there's no way to make
233 	 * sense of this table until we've committed, and we don't want multiple
234 	 * backends manipulating this relation.
235 	 */
236 	Relation relation = try_relation_open(relationId, ExclusiveLock);
237 	if (relation == NULL)
238 	{
239 		ereport(ERROR, (errmsg("could not create distributed table: "
240 							   "relation does not exist")));
241 	}
242 
243 	relation_close(relation, NoLock);
244 
245 	char *distributionColumnName = text_to_cstring(distributionColumnText);
246 	Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
247 																 distributionColumnName);
248 	Assert(distributionColumn != NULL);
249 	char distributionMethod = LookupDistributionMethod(distributionMethodOid);
250 
251 	if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
252 	{
253 		ereport(ERROR, (errmsg("%d is outside the valid range for "
254 							   "parameter \"shard_count\" (1 .. %d)",
255 							   shardCount, MAX_SHARD_COUNT)));
256 	}
257 
258 	CreateDistributedTable(relationId, distributionColumn, distributionMethod,
259 						   shardCount, shardCountIsStrict, colocateWithTableName,
260 						   viaDeprecatedAPI);
261 
262 	PG_RETURN_VOID();
263 }
264 
265 
266 /*
267  * CreateReferenceTable creates a distributed table with the given relationId. The
268  * created table has one shard and replication factor is set to the active worker
269  * count. In fact, the above is the definition of a reference table in Citus.
270  */
271 Datum
create_reference_table(PG_FUNCTION_ARGS)272 create_reference_table(PG_FUNCTION_ARGS)
273 {
274 	CheckCitusVersion(ERROR);
275 	Oid relationId = PG_GETARG_OID(0);
276 
277 	char *colocateWithTableName = NULL;
278 	Var *distributionColumn = NULL;
279 
280 	bool viaDeprecatedAPI = false;
281 
282 	EnsureCitusTableCanBeCreated(relationId);
283 
284 	/* enable create_reference_table on an empty node */
285 	InsertCoordinatorIfClusterEmpty();
286 
287 	/*
288 	 * Lock target relation with an exclusive lock - there's no way to make
289 	 * sense of this table until we've committed, and we don't want multiple
290 	 * backends manipulating this relation.
291 	 */
292 	Relation relation = try_relation_open(relationId, ExclusiveLock);
293 	if (relation == NULL)
294 	{
295 		ereport(ERROR, (errmsg("could not create reference table: "
296 							   "relation does not exist")));
297 	}
298 
299 	relation_close(relation, NoLock);
300 
301 	List *workerNodeList = ActivePrimaryNodeList(ShareLock);
302 	int workerCount = list_length(workerNodeList);
303 
304 	/* if there are no workers, error out */
305 	if (workerCount == 0)
306 	{
307 		char *relationName = get_rel_name(relationId);
308 
309 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
310 						errmsg("cannot create reference table \"%s\"", relationName),
311 						errdetail("There are no active worker nodes.")));
312 	}
313 
314 	CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
315 						   ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
316 	PG_RETURN_VOID();
317 }
318 
319 
320 /*
321  * EnsureCitusTableCanBeCreated checks if
322  * - we are on the coordinator
323  * - the current user is the owner of the table
324  * - relation kind is supported
325  */
326 static void
EnsureCitusTableCanBeCreated(Oid relationOid)327 EnsureCitusTableCanBeCreated(Oid relationOid)
328 {
329 	EnsureCoordinator();
330 	EnsureRelationExists(relationOid);
331 	EnsureTableOwner(relationOid);
332 
333 	/*
334 	 * We should do this check here since the codes in the following lines rely
335 	 * on this relation to have a supported relation kind. More extensive checks
336 	 * will be performed in CreateDistributedTable.
337 	 */
338 	EnsureRelationKindSupported(relationOid);
339 }
340 
341 
342 /*
343  * EnsureRelationExists does a basic check on whether the OID belongs to
344  * an existing relation.
345  */
346 void
EnsureRelationExists(Oid relationId)347 EnsureRelationExists(Oid relationId)
348 {
349 	if (!RelationExists(relationId))
350 	{
351 		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
352 						errmsg("relation with OID %d does not exist",
353 							   relationId)));
354 	}
355 }
356 
357 
358 /*
359  * CreateDistributedTable creates distributed table in the given configuration.
360  * This functions contains all necessary logic to create distributed tables. It
361  * performs necessary checks to ensure distributing the table is safe. If it is
362  * safe to distribute the table, this function creates distributed table metadata,
363  * creates shards and copies local data to shards. This function also handles
364  * partitioned tables by distributing its partitions as well.
365  *
366  * viaDeprecatedAPI boolean flag is not optimal way to implement this function,
367  * but it helps reducing code duplication a lot. We hope to remove that flag one
368  * day, once we deprecate master_create_distribute_table completely.
369  */
370 void
CreateDistributedTable(Oid relationId,Var * distributionColumn,char distributionMethod,int shardCount,bool shardCountIsStrict,char * colocateWithTableName,bool viaDeprecatedAPI)371 CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
372 					   int shardCount, bool shardCountIsStrict,
373 					   char *colocateWithTableName, bool viaDeprecatedAPI)
374 {
375 	/*
376 	 * EnsureTableNotDistributed errors out when relation is a citus table but
377 	 * we don't want to ask user to first undistribute their citus local tables
378 	 * when creating reference or distributed tables from them.
379 	 * For this reason, here we undistribute citus local tables beforehand.
380 	 * But since UndistributeTable does not support undistributing relations
381 	 * involved in foreign key relationships, we first drop foreign keys that
382 	 * given relation is involved, then we undistribute the relation and finally
383 	 * we re-create dropped foreign keys at the end of this function.
384 	 */
385 	List *originalForeignKeyRecreationCommands = NIL;
386 	if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
387 	{
388 		/* store foreign key creation commands that relation is involved */
389 		originalForeignKeyRecreationCommands =
390 			GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
391 																 INCLUDE_ALL_TABLE_TYPES);
392 		relationId = DropFKeysAndUndistributeTable(relationId);
393 	}
394 
395 	/*
396 	 * To support foreign keys between reference tables and local tables,
397 	 * we drop & re-define foreign keys at the end of this function so
398 	 * that ALTER TABLE hook does the necessary job, which means converting
399 	 * local tables to citus local tables to properly support such foreign
400 	 * keys.
401 	 *
402 	 * This function does not expect to create Citus local table, so we blindly
403 	 * create reference table when the method is DISTRIBUTE_BY_NONE.
404 	 */
405 	else if (distributionMethod == DISTRIBUTE_BY_NONE &&
406 			 ShouldEnableLocalReferenceForeignKeys() &&
407 			 HasForeignKeyWithLocalTable(relationId))
408 	{
409 		/*
410 		 * Store foreign key creation commands for foreign key relationships
411 		 * that relation has with postgres tables.
412 		 */
413 		originalForeignKeyRecreationCommands =
414 			GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
415 																 INCLUDE_LOCAL_TABLES);
416 
417 		/*
418 		 * Soon we will convert local tables to citus local tables. As
419 		 * CreateCitusLocalTable needs to use local execution, now we
420 		 * switch to local execution beforehand so that reference table
421 		 * creation doesn't use remote execution and we don't error out
422 		 * in CreateCitusLocalTable.
423 		 */
424 		SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
425 
426 		DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
427 	}
428 
429 	/*
430 	 * distributed tables might have dependencies on different objects, since we create
431 	 * shards for a distributed table via multiple sessions these objects will be created
432 	 * via their own connection and committed immediately so they become visible to all
433 	 * sessions creating shards.
434 	 */
435 	ObjectAddress tableAddress = { 0 };
436 	ObjectAddressSet(tableAddress, RelationRelationId, relationId);
437 	EnsureDependenciesExistOnAllNodes(&tableAddress);
438 
439 	char replicationModel = DecideReplicationModel(distributionMethod,
440 												   colocateWithTableName,
441 												   viaDeprecatedAPI);
442 
443 
444 	/*
445 	 * Due to dropping columns, the parent's distribution key may not match the
446 	 * partition's distribution key. The input distributionColumn belongs to
447 	 * the parent. That's why we override the distribution column of partitions
448 	 * here. See issue #5123 for details.
449 	 */
450 	if (PartitionTable(relationId))
451 	{
452 		Oid parentRelationId = PartitionParentOid(relationId);
453 		char *distributionColumnName =
454 			ColumnToColumnName(parentRelationId, nodeToString(distributionColumn));
455 
456 		distributionColumn =
457 			FindColumnWithNameOnTargetRelation(parentRelationId, distributionColumnName,
458 											   relationId);
459 	}
460 
461 	/*
462 	 * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
463 	 * our caller already acquired lock on relationId.
464 	 */
465 	uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn,
466 												  distributionMethod, replicationModel,
467 												  shardCount, shardCountIsStrict,
468 												  colocateWithTableName,
469 												  viaDeprecatedAPI);
470 
471 	EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
472 								   colocationId, replicationModel, viaDeprecatedAPI);
473 
474 	/*
475 	 * Make sure that existing reference tables have been replicated to all the nodes
476 	 * such that we can create foreign keys and joins work immediately after creation.
477 	 */
478 	EnsureReferenceTablesExistOnAllNodes();
479 
480 	/* we need to calculate these variables before creating distributed metadata */
481 	bool localTableEmpty = TableEmpty(relationId);
482 	Oid colocatedTableId = ColocatedTableId(colocationId);
483 
484 	/* create an entry for distributed table in pg_dist_partition */
485 	InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
486 							  colocationId, replicationModel);
487 
488 	/*
489 	 * Ensure that the sequences used in column defaults of the table
490 	 * have proper types
491 	 */
492 	List *attnumList = NIL;
493 	List *dependentSequenceList = NIL;
494 	GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
495 	EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
496 										  attnumList);
497 
498 	/* foreign tables do not support TRUNCATE trigger */
499 	if (RegularTable(relationId))
500 	{
501 		CreateTruncateTrigger(relationId);
502 	}
503 
504 	/*
505 	 * If we are using master_create_distributed_table, we don't need to continue,
506 	 * because deprecated API does not supports the following features.
507 	 */
508 	if (viaDeprecatedAPI)
509 	{
510 		Assert(colocateWithTableName == NULL);
511 		return;
512 	}
513 
514 	/* create shards for hash distributed and reference tables */
515 	if (distributionMethod == DISTRIBUTE_BY_HASH)
516 	{
517 		CreateHashDistributedTableShards(relationId, shardCount, colocatedTableId,
518 										 localTableEmpty);
519 	}
520 	else if (distributionMethod == DISTRIBUTE_BY_NONE)
521 	{
522 		/*
523 		 * This function does not expect to create Citus local table, so we blindly
524 		 * create reference table when the method is DISTRIBUTE_BY_NONE.
525 		 */
526 		CreateReferenceTableShard(relationId);
527 	}
528 
529 	if (ShouldSyncTableMetadata(relationId))
530 	{
531 		if (ClusterHasKnownMetadataWorkers())
532 		{
533 			/*
534 			 * Ensure sequence dependencies and mark them as distributed
535 			 * before creating table metadata on workers
536 			 */
537 			MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
538 		}
539 
540 		CreateTableMetadataOnWorkers(relationId);
541 	}
542 
543 	/*
544 	 * We've a custom way of foreign key graph invalidation,
545 	 * see InvalidateForeignKeyGraph().
546 	 */
547 	if (TableReferenced(relationId) || TableReferencing(relationId))
548 	{
549 		InvalidateForeignKeyGraph();
550 	}
551 
552 	/* if this table is partitioned table, distribute its partitions too */
553 	if (PartitionedTable(relationId))
554 	{
555 		List *partitionList = PartitionList(relationId);
556 		Oid partitionRelationId = InvalidOid;
557 		Oid namespaceId = get_rel_namespace(relationId);
558 		char *schemaName = get_namespace_name(namespaceId);
559 		char *relationName = get_rel_name(relationId);
560 		char *parentRelationName = quote_qualified_identifier(schemaName, relationName);
561 
562 		foreach_oid(partitionRelationId, partitionList)
563 		{
564 			CreateDistributedTable(partitionRelationId, distributionColumn,
565 								   distributionMethod, shardCount, false,
566 								   parentRelationName, viaDeprecatedAPI);
567 		}
568 	}
569 
570 	/* copy over data for hash distributed and reference tables */
571 	if (distributionMethod == DISTRIBUTE_BY_HASH ||
572 		distributionMethod == DISTRIBUTE_BY_NONE)
573 	{
574 		if (RegularTable(relationId))
575 		{
576 			CopyLocalDataIntoShards(relationId);
577 		}
578 	}
579 
580 	/*
581 	 * Now recreate foreign keys that we dropped beforehand. As modifications are not
582 	 * allowed on the relations that are involved in the foreign key relationship,
583 	 * we can skip the validation of the foreign keys.
584 	 */
585 	bool skip_validation = true;
586 	ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
587 									   skip_validation);
588 }
589 
590 
591 /*
592  * EnsureSequenceTypeSupported ensures that the type of the column that uses
593  * a sequence on its DEFAULT is consistent with previous uses (if any) of the
594  * sequence in distributed tables.
595  * If any other distributed table uses the input sequence, it checks whether
596  * the types of the columns using the sequence match. If they don't, it errors out.
597  * Otherwise, the condition is ensured.
598  */
599 void
EnsureSequenceTypeSupported(Oid seqOid,Oid seqTypId)600 EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId)
601 {
602 	List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
603 	Oid citusTableId = InvalidOid;
604 	foreach_oid(citusTableId, citusTableIdList)
605 	{
606 		List *attnumList = NIL;
607 		List *dependentSequenceList = NIL;
608 		GetDependentSequencesWithRelation(citusTableId, &attnumList,
609 										  &dependentSequenceList, 0);
610 		ListCell *attnumCell = NULL;
611 		ListCell *dependentSequenceCell = NULL;
612 		forboth(attnumCell, attnumList, dependentSequenceCell,
613 				dependentSequenceList)
614 		{
615 			AttrNumber currentAttnum = lfirst_int(attnumCell);
616 			Oid currentSeqOid = lfirst_oid(dependentSequenceCell);
617 
618 			/*
619 			 * If another distributed table is using the same sequence
620 			 * in one of its column defaults, make sure the types of the
621 			 * columns match
622 			 */
623 			if (currentSeqOid == seqOid)
624 			{
625 				Oid currentSeqTypId = GetAttributeTypeOid(citusTableId,
626 														  currentAttnum);
627 				if (seqTypId != currentSeqTypId)
628 				{
629 					char *sequenceName = generate_qualified_relation_name(
630 						seqOid);
631 					char *citusTableName =
632 						generate_qualified_relation_name(citusTableId);
633 					ereport(ERROR, (errmsg(
634 										"The sequence %s is already used for a different"
635 										" type in column %d of the table %s",
636 										sequenceName, currentAttnum,
637 										citusTableName)));
638 				}
639 			}
640 		}
641 	}
642 }
643 
644 
645 /*
646  * AlterSequenceType alters the given sequence's type to the given type.
647  */
648 void
AlterSequenceType(Oid seqOid,Oid typeOid)649 AlterSequenceType(Oid seqOid, Oid typeOid)
650 {
651 	Form_pg_sequence sequenceData = pg_get_sequencedef(seqOid);
652 	Oid currentSequenceTypeOid = sequenceData->seqtypid;
653 	if (currentSequenceTypeOid != typeOid)
654 	{
655 		AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt);
656 		char *seqNamespace = get_namespace_name(get_rel_namespace(seqOid));
657 		char *seqName = get_rel_name(seqOid);
658 		alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1);
659 		Node *asTypeNode = (Node *) makeTypeNameFromOid(typeOid, -1);
660 		SetDefElemArg(alterSequenceStatement, "as", asTypeNode);
661 		ParseState *pstate = make_parsestate(NULL);
662 		AlterSequence(pstate, alterSequenceStatement);
663 	}
664 }
665 
666 
667 /*
668  * MarkSequenceListDistributedAndPropagateDependencies ensures dependencies
669  * for the given sequence list exist on all nodes and marks the sequences
670  * as distributed.
671  */
672 void
MarkSequenceListDistributedAndPropagateDependencies(List * sequenceList)673 MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList)
674 {
675 	Oid sequenceOid = InvalidOid;
676 	foreach_oid(sequenceOid, sequenceList)
677 	{
678 		MarkSequenceDistributedAndPropagateDependencies(sequenceOid);
679 	}
680 }
681 
682 
683 /*
684  * MarkSequenceDistributedAndPropagateDependencies ensures dependencies
685  * for the given sequence exist on all nodes and marks the sequence
686  * as distributed.
687  */
688 void
MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid)689 MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid)
690 {
691 	/* get sequence address */
692 	ObjectAddress sequenceAddress = { 0 };
693 	ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
694 	EnsureDependenciesExistOnAllNodes(&sequenceAddress);
695 	MarkObjectDistributed(&sequenceAddress);
696 }
697 
698 
699 /*
700  * EnsureDistributedSequencesHaveOneType first ensures that the type of the column
701  * in which the sequence is used as default is supported for each sequence in input
702  * dependentSequenceList, and then alters the sequence type if not the same with the column type.
703  */
704 void
EnsureDistributedSequencesHaveOneType(Oid relationId,List * dependentSequenceList,List * attnumList)705 EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceList,
706 									  List *attnumList)
707 {
708 	ListCell *attnumCell = NULL;
709 	ListCell *dependentSequenceCell = NULL;
710 	forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList)
711 	{
712 		AttrNumber attnum = lfirst_int(attnumCell);
713 		Oid sequenceOid = lfirst_oid(dependentSequenceCell);
714 
715 		/*
716 		 * We should make sure that the type of the column that uses
717 		 * that sequence is supported
718 		 */
719 		Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
720 		EnsureSequenceTypeSupported(sequenceOid, seqTypId);
721 
722 		/*
723 		 * Alter the sequence's data type in the coordinator if needed.
724 		 * A sequence's type is bigint by default and it doesn't change even if
725 		 * it's used in an int column. We should change the type if needed,
726 		 * and not allow future ALTER SEQUENCE ... TYPE ... commands for
727 		 * sequences used as defaults in distributed tables
728 		 */
729 		AlterSequenceType(sequenceOid, seqTypId);
730 	}
731 }
732 
733 
734 /*
735  * GetFKeyCreationCommandsRelationInvolvedWithTableType returns a list of DDL
736  * commands to recreate the foreign keys that relation with relationId is involved
737  * with given table type.
738  */
739 static List *
GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId,int tableTypeFlag)740 GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
741 {
742 	int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
743 							   tableTypeFlag;
744 	List *referencingFKeyCreationCommands =
745 		GetForeignConstraintCommandsInternal(relationId, referencingFKeysFlag);
746 
747 	/* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
748 	int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
749 							  EXCLUDE_SELF_REFERENCES |
750 							  tableTypeFlag;
751 	List *referencedFKeyCreationCommands =
752 		GetForeignConstraintCommandsInternal(relationId, referencedFKeysFlag);
753 	return list_concat(referencingFKeyCreationCommands, referencedFKeyCreationCommands);
754 }
755 
756 
757 /*
758  * DropFKeysAndUndistributeTable drops all foreign keys that relation with
759  * relationId is involved then undistributes it.
760  * Note that as UndistributeTable changes relationId of relation, this
761  * function also returns new relationId of relation.
762  * Also note that callers are responsible for storing & recreating foreign
763  * keys to be dropped if needed.
764  */
765 static Oid
DropFKeysAndUndistributeTable(Oid relationId)766 DropFKeysAndUndistributeTable(Oid relationId)
767 {
768 	DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES);
769 
770 	/* store them before calling UndistributeTable as it changes relationId */
771 	char *relationName = get_rel_name(relationId);
772 	Oid schemaId = get_rel_namespace(relationId);
773 
774 	/* suppress notices messages not to be too verbose */
775 	TableConversionParameters params = {
776 		.relationId = relationId,
777 		.cascadeViaForeignKeys = false,
778 		.suppressNoticeMessages = true
779 	};
780 	UndistributeTable(&params);
781 
782 	Oid newRelationId = get_relname_relid(relationName, schemaId);
783 
784 	/*
785 	 * We don't expect this to happen but to be on the safe side let's error
786 	 * out here.
787 	 */
788 	EnsureRelationExists(newRelationId);
789 
790 	return newRelationId;
791 }
792 
793 
794 /*
795  * DropFKeysRelationInvolvedWithTableType drops foreign keys that relation
796  * with relationId is involved with given table type.
797  */
798 static void
DropFKeysRelationInvolvedWithTableType(Oid relationId,int tableTypeFlag)799 DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
800 {
801 	int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
802 							   tableTypeFlag;
803 	DropRelationForeignKeys(relationId, referencingFKeysFlag);
804 
805 	/* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
806 	int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
807 							  EXCLUDE_SELF_REFERENCES |
808 							  tableTypeFlag;
809 	DropRelationForeignKeys(relationId, referencedFKeysFlag);
810 }
811 
812 
813 /*
814  * DecideReplicationModel function decides which replication model should be
815  * used depending on given distribution configuration.
816  */
817 static char
DecideReplicationModel(char distributionMethod,char * colocateWithTableName,bool viaDeprecatedAPI)818 DecideReplicationModel(char distributionMethod, char *colocateWithTableName, bool
819 					   viaDeprecatedAPI)
820 {
821 	if (viaDeprecatedAPI)
822 	{
823 		return REPLICATION_MODEL_COORDINATOR;
824 	}
825 	else if (distributionMethod == DISTRIBUTE_BY_NONE)
826 	{
827 		return REPLICATION_MODEL_2PC;
828 	}
829 	else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
830 			 !IsColocateWithNone(colocateWithTableName))
831 	{
832 		text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
833 		Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false);
834 		CitusTableCacheEntry *targetTableEntry = GetCitusTableCacheEntry(
835 			colocatedRelationId);
836 		char replicationModel = targetTableEntry->replicationModel;
837 
838 		return replicationModel;
839 	}
840 	else if (distributionMethod == DISTRIBUTE_BY_HASH &&
841 			 !DistributedTableReplicationIsEnabled())
842 	{
843 		return REPLICATION_MODEL_STREAMING;
844 	}
845 	else
846 	{
847 		return REPLICATION_MODEL_COORDINATOR;
848 	}
849 
850 	/* we should not reach to this point */
851 	return REPLICATION_MODEL_INVALID;
852 }
853 
854 
855 /*
856  * CreateHashDistributedTableShards creates shards of given hash distributed table.
857  */
858 static void
CreateHashDistributedTableShards(Oid relationId,int shardCount,Oid colocatedTableId,bool localTableEmpty)859 CreateHashDistributedTableShards(Oid relationId, int shardCount,
860 								 Oid colocatedTableId, bool localTableEmpty)
861 {
862 	bool useExclusiveConnection = false;
863 
864 	/*
865 	 * Decide whether to use exclusive connections per placement or not. Note that
866 	 * if the local table is not empty, we cannot use sequential mode since the COPY
867 	 * operation that'd load the data into shards currently requires exclusive
868 	 * connections.
869 	 */
870 	if (RegularTable(relationId))
871 	{
872 		useExclusiveConnection = CanUseExclusiveConnections(relationId,
873 															localTableEmpty);
874 	}
875 
876 	if (colocatedTableId != InvalidOid)
877 	{
878 		CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
879 	}
880 	else
881 	{
882 		/*
883 		 * This path is only reached by create_distributed_table for the distributed
884 		 * tables which will not be part of an existing colocation group. Therefore,
885 		 * we can directly use ShardReplicationFactor global variable here.
886 		 */
887 		CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
888 										 useExclusiveConnection);
889 	}
890 }
891 
892 
893 /*
894  * ColocationIdForNewTable returns a colocation id for hash-distributed table
895  * according to given configuration. If there is no such configuration, it
896  * creates one and returns colocation id of newly the created colocation group.
897  * For append and range distributed tables, this function errors out if
898  * colocateWithTableName parameter is not NULL, otherwise directly returns
899  * INVALID_COLOCATION_ID.
900  *
901  * This function assumes its caller take necessary lock on relationId to
902  * prevent possible changes on it.
903  */
904 static uint32
ColocationIdForNewTable(Oid relationId,Var * distributionColumn,char distributionMethod,char replicationModel,int shardCount,bool shardCountIsStrict,char * colocateWithTableName,bool viaDeprecatedAPI)905 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
906 						char distributionMethod, char replicationModel,
907 						int shardCount, bool shardCountIsStrict,
908 						char *colocateWithTableName, bool viaDeprecatedAPI)
909 {
910 	uint32 colocationId = INVALID_COLOCATION_ID;
911 
912 	if (viaDeprecatedAPI)
913 	{
914 		return colocationId;
915 	}
916 	else if (distributionMethod == DISTRIBUTE_BY_APPEND ||
917 			 distributionMethod == DISTRIBUTE_BY_RANGE)
918 	{
919 		if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
920 		{
921 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
922 							errmsg("cannot distribute relation"),
923 							errdetail("Currently, colocate_with option is only supported "
924 									  "for hash distributed tables.")));
925 		}
926 
927 		return colocationId;
928 	}
929 	else if (distributionMethod == DISTRIBUTE_BY_NONE)
930 	{
931 		return CreateReferenceTableColocationId();
932 	}
933 	else
934 	{
935 		/*
936 		 * Get an exclusive lock on the colocation system catalog. Therefore, we
937 		 * can be sure that there will no modifications on the colocation table
938 		 * until this transaction is committed.
939 		 */
940 		Assert(distributionMethod == DISTRIBUTE_BY_HASH);
941 
942 		Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock);
943 
944 		Oid distributionColumnType = distributionColumn->vartype;
945 		Oid distributionColumnCollation = get_typcollation(distributionColumnType);
946 		bool createdColocationGroup = false;
947 
948 		if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
949 		{
950 			/* check for default colocation group */
951 			colocationId = ColocationId(shardCount, ShardReplicationFactor,
952 										distributionColumnType,
953 										distributionColumnCollation);
954 
955 			/*
956 			 * if the shardCount is strict then we check if the shard count
957 			 * of the colocated table is actually shardCount
958 			 */
959 			if (shardCountIsStrict && colocationId != INVALID_COLOCATION_ID)
960 			{
961 				Oid colocatedTableId = ColocatedTableId(colocationId);
962 
963 				if (colocatedTableId != InvalidOid)
964 				{
965 					CitusTableCacheEntry *cacheEntry =
966 						GetCitusTableCacheEntry(colocatedTableId);
967 					int colocatedTableShardCount = cacheEntry->shardIntervalArrayLength;
968 
969 					if (colocatedTableShardCount != shardCount)
970 					{
971 						colocationId = INVALID_COLOCATION_ID;
972 					}
973 				}
974 			}
975 
976 			if (colocationId == INVALID_COLOCATION_ID)
977 			{
978 				colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,
979 													 distributionColumnType,
980 													 distributionColumnCollation);
981 				createdColocationGroup = true;
982 			}
983 		}
984 		else if (IsColocateWithNone(colocateWithTableName))
985 		{
986 			colocationId = GetNextColocationId();
987 
988 			createdColocationGroup = true;
989 		}
990 		else
991 		{
992 			text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
993 			Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false);
994 
995 			EnsureTableCanBeColocatedWith(relationId, replicationModel,
996 										  distributionColumnType, sourceRelationId);
997 
998 			colocationId = TableColocationId(sourceRelationId);
999 		}
1000 
1001 		/*
1002 		 * If we created a new colocation group then we need to keep the lock to
1003 		 * prevent a concurrent create_distributed_table call from creating another
1004 		 * colocation group with the same parameters. If we're using an existing
1005 		 * colocation group then other transactions will use the same one.
1006 		 */
1007 		if (createdColocationGroup)
1008 		{
1009 			/* keep the exclusive lock */
1010 			table_close(pgDistColocation, NoLock);
1011 		}
1012 		else
1013 		{
1014 			/* release the exclusive lock */
1015 			table_close(pgDistColocation, ExclusiveLock);
1016 		}
1017 	}
1018 
1019 	return colocationId;
1020 }
1021 
1022 
1023 /*
1024  * EnsureRelationCanBeDistributed checks whether Citus can safely distribute given
1025  * relation with the given configuration. We perform almost all safety checks for
1026  * distributing table here. If there is an unsatisfied requirement, we error out
1027  * and do not distribute the table.
1028  *
1029  * This function assumes, callers have already acquired necessary locks to ensure
1030  * there will not be any change in the given relation.
1031  */
1032 static void
EnsureRelationCanBeDistributed(Oid relationId,Var * distributionColumn,char distributionMethod,uint32 colocationId,char replicationModel,bool viaDeprecatedAPI)1033 EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
1034 							   char distributionMethod, uint32 colocationId,
1035 							   char replicationModel, bool viaDeprecatedAPI)
1036 {
1037 	Oid parentRelationId = InvalidOid;
1038 
1039 	EnsureTableNotDistributed(relationId);
1040 	EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI);
1041 	EnsureRelationHasNoTriggers(relationId);
1042 
1043 	/* we assume callers took necessary locks */
1044 	Relation relation = relation_open(relationId, NoLock);
1045 	TupleDesc relationDesc = RelationGetDescr(relation);
1046 	char *relationName = RelationGetRelationName(relation);
1047 
1048 	ErrorIfTableIsACatalogTable(relation);
1049 
1050 	/* verify target relation does not use identity columns */
1051 	if (RelationUsesIdentityColumns(relationDesc))
1052 	{
1053 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1054 						errmsg("cannot distribute relation: %s", relationName),
1055 						errdetail("Distributed relations must not use GENERATED "
1056 								  "... AS IDENTITY.")));
1057 	}
1058 
1059 	/* verify target relation is not distributed by a generated columns */
1060 	if (distributionMethod != DISTRIBUTE_BY_NONE &&
1061 		DistributionColumnUsesGeneratedStoredColumn(relationDesc, distributionColumn))
1062 	{
1063 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1064 						errmsg("cannot distribute relation: %s", relationName),
1065 						errdetail("Distribution column must not use GENERATED ALWAYS "
1066 								  "AS (...) STORED.")));
1067 	}
1068 
1069 	/* check for support function needed by specified partition method */
1070 	if (distributionMethod == DISTRIBUTE_BY_HASH)
1071 	{
1072 		Oid hashSupportFunction = SupportFunctionForColumn(distributionColumn,
1073 														   HASH_AM_OID,
1074 														   HASHSTANDARD_PROC);
1075 		if (hashSupportFunction == InvalidOid)
1076 		{
1077 			ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
1078 							errmsg("could not identify a hash function for type %s",
1079 								   format_type_be(distributionColumn->vartype)),
1080 							errdatatype(distributionColumn->vartype),
1081 							errdetail("Partition column types must have a hash function "
1082 									  "defined to use hash partitioning.")));
1083 		}
1084 
1085 		if (distributionColumn->varcollid != InvalidOid &&
1086 			!get_collation_isdeterministic(distributionColumn->varcollid))
1087 		{
1088 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1089 							errmsg("Hash distributed partition columns may not use "
1090 								   "a non deterministic collation")));
1091 		}
1092 	}
1093 	else if (distributionMethod == DISTRIBUTE_BY_RANGE)
1094 	{
1095 		Oid btreeSupportFunction = SupportFunctionForColumn(distributionColumn,
1096 															BTREE_AM_OID, BTORDER_PROC);
1097 		if (btreeSupportFunction == InvalidOid)
1098 		{
1099 			ereport(ERROR,
1100 					(errcode(ERRCODE_UNDEFINED_FUNCTION),
1101 					 errmsg("could not identify a comparison function for type %s",
1102 							format_type_be(distributionColumn->vartype)),
1103 					 errdatatype(distributionColumn->vartype),
1104 					 errdetail("Partition column types must have a comparison function "
1105 							   "defined to use range partitioning.")));
1106 		}
1107 	}
1108 
1109 	if (PartitionTable(relationId))
1110 	{
1111 		parentRelationId = PartitionParentOid(relationId);
1112 	}
1113 
1114 	/* partitions cannot be distributed if their parent is not distributed */
1115 	if (PartitionTable(relationId) && !IsCitusTable(parentRelationId))
1116 	{
1117 		char *parentRelationName = get_rel_name(parentRelationId);
1118 
1119 		ereport(ERROR, (errmsg("cannot distribute relation \"%s\" which is partition of "
1120 							   "\"%s\"", relationName, parentRelationName),
1121 						errdetail("Citus does not support distributing partitions "
1122 								  "if their parent is not distributed table."),
1123 						errhint("Distribute the partitioned table \"%s\" instead.",
1124 								parentRelationName)));
1125 	}
1126 
1127 	/*
1128 	 * These checks are mostly for partitioned tables not partitions because we prevent
1129 	 * distributing partitions directly in the above check. However, partitions can still
1130 	 * reach this point because, we call CreateDistributedTable for partitions if their
1131 	 * parent table is distributed.
1132 	 */
1133 	if (PartitionedTable(relationId))
1134 	{
1135 		/* we cannot distribute partitioned tables with master_create_distributed_table */
1136 		if (viaDeprecatedAPI)
1137 		{
1138 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1139 							errmsg("distributing partitioned tables in only supported "
1140 								   "with create_distributed_table UDF")));
1141 		}
1142 
1143 		/* distributing partitioned tables in only supported for hash-distribution */
1144 		if (distributionMethod != DISTRIBUTE_BY_HASH)
1145 		{
1146 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1147 							errmsg("distributing partitioned tables in only supported "
1148 								   "for hash-distributed tables")));
1149 		}
1150 
1151 		/* we don't support distributing tables with multi-level partitioning */
1152 		if (PartitionTable(relationId))
1153 		{
1154 			char *parentRelationName = get_rel_name(parentRelationId);
1155 
1156 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1157 							errmsg("distributing multi-level partitioned tables "
1158 								   "is not supported"),
1159 							errdetail("Relation \"%s\" is partitioned table itself and "
1160 									  "it is also partition of relation \"%s\".",
1161 									  relationName, parentRelationName)));
1162 		}
1163 	}
1164 
1165 	ErrorIfUnsupportedConstraint(relation, distributionMethod, replicationModel,
1166 								 distributionColumn, colocationId);
1167 
1168 
1169 	ErrorIfUnsupportedPolicy(relation);
1170 	relation_close(relation, NoLock);
1171 }
1172 
1173 
1174 /*
1175  * ErrorIfTableIsACatalogTable is a helper function to error out for citus
1176  * table creation from a catalog table.
1177  */
1178 void
ErrorIfTableIsACatalogTable(Relation relation)1179 ErrorIfTableIsACatalogTable(Relation relation)
1180 {
1181 	if (relation->rd_rel->relnamespace != PG_CATALOG_NAMESPACE)
1182 	{
1183 		return;
1184 	}
1185 
1186 	ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1187 					errmsg("cannot create a citus table from a catalog table")));
1188 }
1189 
1190 
1191 /*
1192  * EnsureTableCanBeColocatedWith checks whether a given replication model and
1193  * distribution column type is suitable to distribute a table to be colocated
1194  * with given source table.
1195  *
1196  * We only pass relationId to provide meaningful error messages.
1197  */
1198 static void
EnsureTableCanBeColocatedWith(Oid relationId,char replicationModel,Oid distributionColumnType,Oid sourceRelationId)1199 EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
1200 							  Oid distributionColumnType, Oid sourceRelationId)
1201 {
1202 	CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
1203 	char sourceReplicationModel = sourceTableEntry->replicationModel;
1204 	Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
1205 
1206 	if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
1207 	{
1208 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1209 						errmsg("cannot distribute relation"),
1210 						errdetail("Currently, colocate_with option is only supported "
1211 								  "for hash distributed tables.")));
1212 	}
1213 
1214 	if (sourceReplicationModel != replicationModel)
1215 	{
1216 		char *relationName = get_rel_name(relationId);
1217 		char *sourceRelationName = get_rel_name(sourceRelationId);
1218 
1219 		ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
1220 							   sourceRelationName, relationName),
1221 						errdetail("Replication models don't match for %s and %s.",
1222 								  sourceRelationName, relationName)));
1223 	}
1224 
1225 	Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
1226 	if (sourceDistributionColumnType != distributionColumnType)
1227 	{
1228 		char *relationName = get_rel_name(relationId);
1229 		char *sourceRelationName = get_rel_name(sourceRelationId);
1230 
1231 		ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
1232 							   sourceRelationName, relationName),
1233 						errdetail("Distribution column types don't match for "
1234 								  "%s and %s.", sourceRelationName,
1235 								  relationName)));
1236 	}
1237 }
1238 
1239 
1240 /*
1241  * EnsureLocalTableEmptyIfNecessary errors out if the function should be empty
1242  * according to ShouldLocalTableBeEmpty but it is not.
1243  */
1244 static void
EnsureLocalTableEmptyIfNecessary(Oid relationId,char distributionMethod,bool viaDeprecatedAPI)1245 EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod,
1246 								 bool viaDeprecatedAPI)
1247 {
1248 	if (ShouldLocalTableBeEmpty(relationId, distributionMethod, viaDeprecatedAPI))
1249 	{
1250 		EnsureLocalTableEmpty(relationId);
1251 	}
1252 }
1253 
1254 
1255 /*
1256  * ShouldLocalTableBeEmpty returns true if the local table should be empty
1257  * before creating a citus table.
1258  * In some cases, it is possible and safe to send local data to shards while
1259  * distributing the table. In those cases, we can distribute non-empty local
1260  * tables. This function checks the distributionMethod and relation kind to
1261  * see whether we need to be ensure emptiness of local table.
1262  */
1263 static bool
ShouldLocalTableBeEmpty(Oid relationId,char distributionMethod,bool viaDeprecatedAPI)1264 ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod,
1265 						bool viaDeprecatedAPI)
1266 {
1267 	bool shouldLocalTableBeEmpty = false;
1268 	if (viaDeprecatedAPI)
1269 	{
1270 		/* we don't support copying local data via deprecated API */
1271 		shouldLocalTableBeEmpty = true;
1272 	}
1273 	else if (distributionMethod != DISTRIBUTE_BY_HASH &&
1274 			 distributionMethod != DISTRIBUTE_BY_NONE)
1275 	{
1276 		/*
1277 		 * We only support hash distributed tables and reference tables
1278 		 * for initial data loading
1279 		 */
1280 		shouldLocalTableBeEmpty = true;
1281 	}
1282 	else if (!RegularTable(relationId))
1283 	{
1284 		/*
1285 		 * We only support tables and partitioned tables for initial
1286 		 * data loading
1287 		 */
1288 		shouldLocalTableBeEmpty = true;
1289 	}
1290 
1291 	return shouldLocalTableBeEmpty;
1292 }
1293 
1294 
1295 /*
1296  * EnsureLocalTableEmpty errors out if the local table is not empty.
1297  */
1298 static void
EnsureLocalTableEmpty(Oid relationId)1299 EnsureLocalTableEmpty(Oid relationId)
1300 {
1301 	char *relationName = get_rel_name(relationId);
1302 	bool localTableEmpty = TableEmpty(relationId);
1303 
1304 	if (!localTableEmpty)
1305 	{
1306 		ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1307 						errmsg("cannot distribute relation \"%s\"", relationName),
1308 						errdetail("Relation \"%s\" contains data.", relationName),
1309 						errhint("Empty your table before distributing it.")));
1310 	}
1311 }
1312 
1313 
1314 /*
1315  * EnsureTableNotDistributed errors out if the table is distributed.
1316  */
1317 void
EnsureTableNotDistributed(Oid relationId)1318 EnsureTableNotDistributed(Oid relationId)
1319 {
1320 	char *relationName = get_rel_name(relationId);
1321 
1322 	bool isCitusTable = IsCitusTable(relationId);
1323 
1324 	if (isCitusTable)
1325 	{
1326 		ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1327 						errmsg("table \"%s\" is already distributed",
1328 							   relationName)));
1329 	}
1330 }
1331 
1332 
1333 /*
1334  * EnsureRelationHasNoTriggers errors out if the given table has triggers on
1335  * it. See also GetExplicitTriggerIdList function's comment for the triggers this
1336  * function errors out.
1337  */
1338 static void
EnsureRelationHasNoTriggers(Oid relationId)1339 EnsureRelationHasNoTriggers(Oid relationId)
1340 {
1341 	List *explicitTriggerIds = GetExplicitTriggerIdList(relationId);
1342 
1343 	if (list_length(explicitTriggerIds) > 0)
1344 	{
1345 		char *relationName = get_rel_name(relationId);
1346 
1347 		Assert(relationName != NULL);
1348 		ereport(ERROR, (errmsg("cannot distribute relation \"%s\" because it has "
1349 							   "triggers ", relationName),
1350 						errdetail("Citus does not support distributing tables with "
1351 								  "triggers."),
1352 						errhint("Drop all the triggers on \"%s\" and retry.",
1353 								relationName)));
1354 	}
1355 }
1356 
1357 
1358 /*
1359  * LookupDistributionMethod maps the oids of citus.distribution_type enum
1360  * values to pg_dist_partition.partmethod values.
1361  *
1362  * The passed in oid has to belong to a value of citus.distribution_type.
1363  */
1364 char
LookupDistributionMethod(Oid distributionMethodOid)1365 LookupDistributionMethod(Oid distributionMethodOid)
1366 {
1367 	char distributionMethod = 0;
1368 
1369 	HeapTuple enumTuple = SearchSysCache1(ENUMOID, ObjectIdGetDatum(
1370 											  distributionMethodOid));
1371 	if (!HeapTupleIsValid(enumTuple))
1372 	{
1373 		ereport(ERROR, (errmsg("invalid internal value for enum: %u",
1374 							   distributionMethodOid)));
1375 	}
1376 
1377 	Form_pg_enum enumForm = (Form_pg_enum) GETSTRUCT(enumTuple);
1378 	const char *enumLabel = NameStr(enumForm->enumlabel);
1379 
1380 	if (strncmp(enumLabel, "append", NAMEDATALEN) == 0)
1381 	{
1382 		distributionMethod = DISTRIBUTE_BY_APPEND;
1383 	}
1384 	else if (strncmp(enumLabel, "hash", NAMEDATALEN) == 0)
1385 	{
1386 		distributionMethod = DISTRIBUTE_BY_HASH;
1387 	}
1388 	else if (strncmp(enumLabel, "range", NAMEDATALEN) == 0)
1389 	{
1390 		distributionMethod = DISTRIBUTE_BY_RANGE;
1391 	}
1392 	else
1393 	{
1394 		ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
1395 	}
1396 
1397 	ReleaseSysCache(enumTuple);
1398 
1399 	return distributionMethod;
1400 }
1401 
1402 
1403 /*
1404  *	SupportFunctionForColumn locates a support function given a column, an access method,
1405  *	and and id of a support function. This function returns InvalidOid if there is no
1406  *	support function for the operator class family of the column, but if the data type
1407  *	of the column has no default operator class whatsoever, this function errors out.
1408  */
1409 static Oid
SupportFunctionForColumn(Var * partitionColumn,Oid accessMethodId,int16 supportFunctionNumber)1410 SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
1411 						 int16 supportFunctionNumber)
1412 {
1413 	Oid columnOid = partitionColumn->vartype;
1414 	Oid operatorClassId = GetDefaultOpClass(columnOid, accessMethodId);
1415 
1416 	/* currently only support using the default operator class */
1417 	if (operatorClassId == InvalidOid)
1418 	{
1419 		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
1420 						errmsg("data type %s has no default operator class for specified"
1421 							   " partition method", format_type_be(columnOid)),
1422 						errdatatype(columnOid),
1423 						errdetail("Partition column types must have a default operator"
1424 								  " class defined.")));
1425 	}
1426 
1427 	Oid operatorFamilyId = get_opclass_family(operatorClassId);
1428 	Oid operatorClassInputType = get_opclass_input_type(operatorClassId);
1429 	Oid supportFunctionOid = get_opfamily_proc(operatorFamilyId, operatorClassInputType,
1430 											   operatorClassInputType,
1431 											   supportFunctionNumber);
1432 
1433 	return supportFunctionOid;
1434 }
1435 
1436 
1437 /*
1438  * TableEmpty function checks whether given table contains any row and
1439  * returns false if there is any data.
1440  */
1441 bool
TableEmpty(Oid tableId)1442 TableEmpty(Oid tableId)
1443 {
1444 	Oid schemaId = get_rel_namespace(tableId);
1445 	char *schemaName = get_namespace_name(schemaId);
1446 	char *tableName = get_rel_name(tableId);
1447 	char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName);
1448 
1449 	StringInfo selectTrueQueryString = makeStringInfo();
1450 
1451 	bool readOnly = true;
1452 
1453 	int spiConnectionResult = SPI_connect();
1454 	if (spiConnectionResult != SPI_OK_CONNECT)
1455 	{
1456 		ereport(ERROR, (errmsg("could not connect to SPI manager")));
1457 	}
1458 
1459 	appendStringInfo(selectTrueQueryString, SELECT_TRUE_QUERY, tableQualifiedName);
1460 
1461 	int spiQueryResult = SPI_execute(selectTrueQueryString->data, readOnly, 0);
1462 	if (spiQueryResult != SPI_OK_SELECT)
1463 	{
1464 		ereport(ERROR, (errmsg("execution was not successful \"%s\"",
1465 							   selectTrueQueryString->data)));
1466 	}
1467 
1468 	/* we expect that SELECT TRUE query will return single value in a single row OR empty set */
1469 	Assert(SPI_processed == 1 || SPI_processed == 0);
1470 
1471 	bool localTableEmpty = !SPI_processed;
1472 
1473 	SPI_finish();
1474 
1475 	return localTableEmpty;
1476 }
1477 
1478 
1479 /*
1480  * CanUseExclusiveConnections checks if we can open parallel connections
1481  * while creating shards. We simply error out if we need to execute
1482  * sequentially but there is data in the table, since we cannot copy the
1483  * data to shards sequentially.
1484  */
1485 static bool
CanUseExclusiveConnections(Oid relationId,bool localTableEmpty)1486 CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
1487 {
1488 	bool hasForeignKeyToReferenceTable = HasForeignKeyToReferenceTable(relationId);
1489 	bool shouldRunSequential = MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
1490 							   hasForeignKeyToReferenceTable;
1491 
1492 	if (!localTableEmpty && shouldRunSequential)
1493 	{
1494 		char *relationName = get_rel_name(relationId);
1495 
1496 		ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode "
1497 							   "because it is not empty", relationName),
1498 						errhint("If you have manually set "
1499 								"citus.multi_shard_modify_mode to 'sequential', "
1500 								"try with 'parallel' option. If that is not the "
1501 								"case, try distributing local tables when they "
1502 								"are empty.")));
1503 	}
1504 	else if (shouldRunSequential && ParallelQueryExecutedInTransaction())
1505 	{
1506 		/*
1507 		 * We decided to use sequential execution. It's either because relation
1508 		 * has a pre-existing foreign key to a reference table or because we
1509 		 * decided to use sequential execution due to a query executed in the
1510 		 * current xact beforehand.
1511 		 * We have specific error messages for either cases.
1512 		 */
1513 
1514 		char *relationName = get_rel_name(relationId);
1515 
1516 		if (hasForeignKeyToReferenceTable)
1517 		{
1518 			/*
1519 			 * If there has already been a parallel query executed, the sequential mode
1520 			 * would still use the already opened parallel connections to the workers,
1521 			 * thus contradicting our purpose of using sequential mode.
1522 			 */
1523 			ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this "
1524 								   "transaction because it has a foreign key to "
1525 								   "a reference table", relationName),
1526 							errdetail("If a hash distributed table has a foreign key "
1527 									  "to a reference table, it has to be created "
1528 									  "in sequential mode before any parallel commands "
1529 									  "have been executed in the same transaction"),
1530 							errhint("Try re-running the transaction with "
1531 									"\"SET LOCAL citus.multi_shard_modify_mode TO "
1532 									"\'sequential\';\"")));
1533 		}
1534 		else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
1535 		{
1536 			ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode because "
1537 								   "a parallel query was executed in this transaction",
1538 								   relationName),
1539 							errhint("If you have manually set "
1540 									"citus.multi_shard_modify_mode to 'sequential', "
1541 									"try with 'parallel' option. ")));
1542 		}
1543 	}
1544 	else if (shouldRunSequential)
1545 	{
1546 		return false;
1547 	}
1548 	else if (!localTableEmpty || IsMultiStatementTransaction())
1549 	{
1550 		return true;
1551 	}
1552 
1553 	return false;
1554 }
1555 
1556 
1557 /*
1558  * CreateTruncateTrigger creates a truncate trigger on table identified by relationId
1559  * and assigns citus_truncate_trigger() as handler.
1560  */
1561 void
CreateTruncateTrigger(Oid relationId)1562 CreateTruncateTrigger(Oid relationId)
1563 {
1564 	StringInfo triggerName = makeStringInfo();
1565 	bool internal = true;
1566 
1567 	appendStringInfo(triggerName, "truncate_trigger");
1568 
1569 	CreateTrigStmt *trigger = makeNode(CreateTrigStmt);
1570 	trigger->trigname = triggerName->data;
1571 	trigger->relation = NULL;
1572 	trigger->funcname = SystemFuncName(CITUS_TRUNCATE_TRIGGER_NAME);
1573 	trigger->args = NIL;
1574 	trigger->row = false;
1575 	trigger->timing = TRIGGER_TYPE_AFTER;
1576 	trigger->events = TRIGGER_TYPE_TRUNCATE;
1577 	trigger->columns = NIL;
1578 	trigger->whenClause = NULL;
1579 	trigger->isconstraint = false;
1580 
1581 	CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid,
1582 				  InvalidOid, InvalidOid, NULL,
1583 				  internal, false);
1584 }
1585 
1586 
1587 /*
1588  * RegularTable function returns true if given table's relation kind is RELKIND_RELATION
1589  * or RELKIND_PARTITIONED_TABLE otherwise it returns false.
1590  */
1591 bool
RegularTable(Oid relationId)1592 RegularTable(Oid relationId)
1593 {
1594 	char relationKind = get_rel_relkind(relationId);
1595 
1596 	if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE)
1597 	{
1598 		return true;
1599 	}
1600 
1601 	return false;
1602 }
1603 
1604 
1605 /*
1606  * CopyLocalDataIntoShards copies data from the local table, which is hidden
1607  * after converting it to a distributed table, into the shards of the distributed
1608  * table. For partitioned tables, this functions returns without copying the data
1609  * because we call this function for both partitioned tables and its partitions.
1610  * Returning early saves us from copying data to workers twice.
1611  *
1612  * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
1613  * We cannot use a regular COPY here since that cannot read from a table. Instead
1614  * we read from the table and pass each tuple to the CitusCopyDestReceiver which
1615  * opens a connection and starts a COPY for each shard placement that will have
1616  * data.
1617  *
1618  * We could call the planner and executor here and send the output to the
1619  * DestReceiver, but we are in a tricky spot here since Citus is already
1620  * intercepting queries on this table in the planner and executor hooks and we
1621  * want to read from the local table. To keep it simple, we perform a heap scan
1622  * directly on the table.
1623  *
1624  * Any writes on the table that are started during this operation will be handled
1625  * as distributed queries once the current transaction commits. SELECTs will
1626  * continue to read from the local table until the current transaction commits,
1627  * after which new SELECTs will be handled as distributed queries.
1628  *
1629  * After copying local data into the distributed table, the local data remains
1630  * in place and should be truncated at a later time.
1631  */
1632 static void
CopyLocalDataIntoShards(Oid distributedRelationId)1633 CopyLocalDataIntoShards(Oid distributedRelationId)
1634 {
1635 	/* take an ExclusiveLock to block all operations except SELECT */
1636 	Relation distributedRelation = table_open(distributedRelationId, ExclusiveLock);
1637 
1638 	/*
1639 	 * Skip copying from partitioned tables, we will copy the data from
1640 	 * partition to partition's shards.
1641 	 */
1642 	if (PartitionedTable(distributedRelationId))
1643 	{
1644 		table_close(distributedRelation, NoLock);
1645 
1646 		return;
1647 	}
1648 
1649 	/*
1650 	 * All writes have finished, make sure that we can see them by using the
1651 	 * latest snapshot. We use GetLatestSnapshot instead of
1652 	 * GetTransactionSnapshot since the latter would not reveal all writes
1653 	 * in serializable or repeatable read mode. Note that subsequent reads
1654 	 * from the distributed table would reveal those writes, temporarily
1655 	 * violating the isolation level. However, this seems preferable over
1656 	 * dropping the writes entirely.
1657 	 */
1658 	PushActiveSnapshot(GetLatestSnapshot());
1659 
1660 	/* get the table columns */
1661 	TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
1662 	TupleTableSlot *slot = CreateTableSlotForRel(distributedRelation);
1663 	List *columnNameList = TupleDescColumnNameList(tupleDescriptor);
1664 
1665 	int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
1666 
1667 	/* determine the partition column in the tuple descriptor */
1668 	Var *partitionColumn = PartitionColumn(distributedRelationId, 0);
1669 	if (partitionColumn != NULL)
1670 	{
1671 		partitionColumnIndex = partitionColumn->varattno - 1;
1672 	}
1673 
1674 	/* initialise per-tuple memory context */
1675 	EState *estate = CreateExecutorState();
1676 	ExprContext *econtext = GetPerTupleExprContext(estate);
1677 	econtext->ecxt_scantuple = slot;
1678 
1679 	bool stopOnFailure = true;
1680 	DestReceiver *copyDest =
1681 		(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
1682 													 columnNameList,
1683 													 partitionColumnIndex,
1684 													 estate, stopOnFailure,
1685 													 NULL);
1686 
1687 	/* initialise state for writing to shards, we'll open connections on demand */
1688 	copyDest->rStartup(copyDest, 0, tupleDescriptor);
1689 
1690 	DoCopyFromLocalTableIntoShards(distributedRelation, copyDest, slot, estate);
1691 
1692 	/* finish writing into the shards */
1693 	copyDest->rShutdown(copyDest);
1694 	copyDest->rDestroy(copyDest);
1695 
1696 	/* free memory and close the relation */
1697 	ExecDropSingleTupleTableSlot(slot);
1698 	FreeExecutorState(estate);
1699 	table_close(distributedRelation, NoLock);
1700 
1701 	PopActiveSnapshot();
1702 }
1703 
1704 
1705 /*
1706  * DoCopyFromLocalTableIntoShards performs a copy operation
1707  * from local tables into shards.
1708  */
1709 static void
DoCopyFromLocalTableIntoShards(Relation distributedRelation,DestReceiver * copyDest,TupleTableSlot * slot,EState * estate)1710 DoCopyFromLocalTableIntoShards(Relation distributedRelation,
1711 							   DestReceiver *copyDest,
1712 							   TupleTableSlot *slot,
1713 							   EState *estate)
1714 {
1715 	/* begin reading from local table */
1716 	TableScanDesc scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0,
1717 										 NULL);
1718 
1719 	MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1720 
1721 	uint64 rowsCopied = 0;
1722 	while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
1723 	{
1724 		/* send tuple it to a shard */
1725 		copyDest->receiveSlot(slot, copyDest);
1726 
1727 		/* clear tuple memory */
1728 		ResetPerTupleExprContext(estate);
1729 
1730 		/* make sure we roll back on cancellation */
1731 		CHECK_FOR_INTERRUPTS();
1732 
1733 		if (rowsCopied == 0)
1734 		{
1735 			ereport(NOTICE, (errmsg("Copying data from local table...")));
1736 		}
1737 
1738 		rowsCopied++;
1739 
1740 		if (rowsCopied % LOG_PER_TUPLE_AMOUNT == 0)
1741 		{
1742 			ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
1743 		}
1744 	}
1745 
1746 	if (rowsCopied % LOG_PER_TUPLE_AMOUNT != 0)
1747 	{
1748 		ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
1749 	}
1750 
1751 	if (rowsCopied > 0)
1752 	{
1753 		char *qualifiedRelationName =
1754 			generate_qualified_relation_name(RelationGetRelid(distributedRelation));
1755 		ereport(NOTICE, (errmsg("copying the data has completed"),
1756 						 errdetail("The local data in the table is no longer visible, "
1757 								   "but is still on disk."),
1758 						 errhint("To remove the local data, run: SELECT "
1759 								 "truncate_local_data_after_distributing_table($$%s$$)",
1760 								 qualifiedRelationName)));
1761 	}
1762 
1763 	MemoryContextSwitchTo(oldContext);
1764 
1765 	/* finish reading from the local table */
1766 	table_endscan(scan);
1767 }
1768 
1769 
1770 /*
1771  * TupleDescColumnNameList returns a list of column names for the given tuple
1772  * descriptor as plain strings.
1773  */
1774 static List *
TupleDescColumnNameList(TupleDesc tupleDescriptor)1775 TupleDescColumnNameList(TupleDesc tupleDescriptor)
1776 {
1777 	List *columnNameList = NIL;
1778 
1779 	for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++)
1780 	{
1781 		Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex);
1782 		char *columnName = NameStr(currentColumn->attname);
1783 
1784 		if (currentColumn->attisdropped ||
1785 			currentColumn->attgenerated == ATTRIBUTE_GENERATED_STORED
1786 			)
1787 		{
1788 			continue;
1789 		}
1790 
1791 		columnNameList = lappend(columnNameList, columnName);
1792 	}
1793 
1794 	return columnNameList;
1795 }
1796 
1797 
1798 /*
1799  * RelationUsesIdentityColumns returns whether a given relation uses
1800  * GENERATED ... AS IDENTITY
1801  */
1802 bool
RelationUsesIdentityColumns(TupleDesc relationDesc)1803 RelationUsesIdentityColumns(TupleDesc relationDesc)
1804 {
1805 	for (int attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++)
1806 	{
1807 		Form_pg_attribute attributeForm = TupleDescAttr(relationDesc, attributeIndex);
1808 
1809 		if (attributeForm->attidentity != '\0')
1810 		{
1811 			return true;
1812 		}
1813 	}
1814 
1815 	return false;
1816 }
1817 
1818 
1819 /*
1820  * DistributionColumnUsesGeneratedStoredColumn returns whether a given relation uses
1821  * GENERATED ALWAYS AS (...) STORED on distribution column
1822  */
1823 static bool
DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,Var * distributionColumn)1824 DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
1825 											Var *distributionColumn)
1826 {
1827 	Form_pg_attribute attributeForm = TupleDescAttr(relationDesc,
1828 													distributionColumn->varattno - 1);
1829 
1830 	if (attributeForm->attgenerated == ATTRIBUTE_GENERATED_STORED)
1831 	{
1832 		return true;
1833 	}
1834 
1835 	return false;
1836 }
1837