1 /*-------------------------------------------------------------------------
2 *
3 * create_distributed_table.c
4 * Routines relation to the creation of distributed relations.
5 *
6 * Copyright (c) Citus Data, Inc.
7 *
8 *-------------------------------------------------------------------------
9 */
10
11 #include "postgres.h"
12 #include "miscadmin.h"
13
14 #include "distributed/pg_version_constants.h"
15 #include "distributed/commands/utility_hook.h"
16
17 #include "access/genam.h"
18 #include "access/hash.h"
19 #include "access/heapam.h"
20 #include "access/htup.h"
21 #include "access/htup_details.h"
22 #include "access/nbtree.h"
23 #include "access/xact.h"
24 #include "catalog/dependency.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "catalog/pg_attribute.h"
28 #include "catalog/pg_enum.h"
29 #include "catalog/pg_extension.h"
30 #include "catalog/pg_namespace.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_proc.h"
33 #include "catalog/pg_trigger.h"
34 #include "commands/defrem.h"
35 #include "commands/extension.h"
36 #include "commands/sequence.h"
37 #include "commands/tablecmds.h"
38 #include "commands/trigger.h"
39 #include "distributed/commands/multi_copy.h"
40 #include "distributed/citus_ruleutils.h"
41 #include "distributed/colocation_utils.h"
42 #include "distributed/commands.h"
43 #include "distributed/deparser.h"
44 #include "distributed/distribution_column.h"
45 #include "distributed/listutils.h"
46 #include "distributed/local_executor.h"
47 #include "distributed/metadata_utility.h"
48 #include "distributed/coordinator_protocol.h"
49 #include "distributed/metadata/dependency.h"
50 #include "distributed/metadata/distobject.h"
51 #include "distributed/metadata_cache.h"
52 #include "distributed/metadata_sync.h"
53 #include "distributed/multi_executor.h"
54 #include "distributed/multi_logical_planner.h"
55 #include "distributed/multi_partitioning_utils.h"
56 #include "distributed/pg_dist_colocation.h"
57 #include "distributed/pg_dist_partition.h"
58 #include "distributed/reference_table_utils.h"
59 #include "distributed/relation_access_tracking.h"
60 #include "distributed/remote_commands.h"
61 #include "distributed/shared_library_init.h"
62 #include "distributed/worker_protocol.h"
63 #include "distributed/worker_transaction.h"
64 #include "distributed/version_compat.h"
65 #include "executor/executor.h"
66 #include "executor/spi.h"
67 #include "nodes/execnodes.h"
68 #include "nodes/makefuncs.h"
69 #include "nodes/nodeFuncs.h"
70 #include "nodes/pg_list.h"
71 #include "parser/parse_expr.h"
72 #include "parser/parse_node.h"
73 #include "parser/parse_relation.h"
74 #include "parser/parser.h"
75 #include "storage/lmgr.h"
76 #include "tcop/pquery.h"
77 #include "tcop/tcopprot.h"
78 #include "utils/builtins.h"
79 #include "utils/lsyscache.h"
80 #include "utils/memutils.h"
81 #include "utils/rel.h"
82 #include "utils/snapmgr.h"
83 #include "utils/syscache.h"
84 #include "utils/inval.h"
85
86 /*
87 * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
88 */
89 #define LOG_PER_TUPLE_AMOUNT 1000000
90
91 /* local function forward declarations */
92 static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName,
93 bool viaDeprecatedAPI);
94 static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
95 Oid colocatedTableId, bool localTableEmpty);
96 static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
97 char distributionMethod, char replicationModel,
98 int shardCount, bool shardCountIsStrict,
99 char *colocateWithTableName,
100 bool viaDeprecatedAPI);
101 static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
102 char distributionMethod, uint32 colocationId,
103 char replicationModel, bool viaDeprecatedAPI);
104 static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
105 Oid distributionColumnType,
106 Oid sourceRelationId);
107 static void EnsureLocalTableEmpty(Oid relationId);
108 static void EnsureRelationHasNoTriggers(Oid relationId);
109 static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
110 int16 supportFunctionNumber);
111 static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod,
112 bool viaDeprecatedAPI);
113 static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod, bool
114 viaDeprecatedAPI);
115 static void EnsureCitusTableCanBeCreated(Oid relationOid);
116 static List * GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId,
117 int tableTypeFlag);
118 static Oid DropFKeysAndUndistributeTable(Oid relationId);
119 static void DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag);
120 static void CopyLocalDataIntoShards(Oid relationId);
121 static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
122 static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
123 Var *distributionColumn);
124 static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
125 static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
126 DestReceiver *copyDest,
127 TupleTableSlot *slot,
128 EState *estate);
129
130 /* exports for SQL callable functions */
131 PG_FUNCTION_INFO_V1(master_create_distributed_table);
132 PG_FUNCTION_INFO_V1(create_distributed_table);
133 PG_FUNCTION_INFO_V1(create_reference_table);
134
135
136 /*
137 * master_create_distributed_table accepts a table, distribution column and
138 * method and performs the corresponding catalog changes.
139 *
140 * Note that this UDF is deprecated and cannot create colocated tables, so we
141 * always use INVALID_COLOCATION_ID.
142 */
143 Datum
master_create_distributed_table(PG_FUNCTION_ARGS)144 master_create_distributed_table(PG_FUNCTION_ARGS)
145 {
146 CheckCitusVersion(ERROR);
147 Oid relationId = PG_GETARG_OID(0);
148 text *distributionColumnText = PG_GETARG_TEXT_P(1);
149 Oid distributionMethodOid = PG_GETARG_OID(2);
150
151 EnsureCitusTableCanBeCreated(relationId);
152
153 char *colocateWithTableName = NULL;
154 bool viaDeprecatedAPI = true;
155
156 /*
157 * Lock target relation with an exclusive lock - there's no way to make
158 * sense of this table until we've committed, and we don't want multiple
159 * backends manipulating this relation.
160 */
161 Relation relation = try_relation_open(relationId, ExclusiveLock);
162
163 if (relation == NULL)
164 {
165 ereport(ERROR, (errmsg("could not create distributed table: "
166 "relation does not exist")));
167 }
168
169 char *distributionColumnName = text_to_cstring(distributionColumnText);
170 Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
171 distributionColumnName);
172 Assert(distributionColumn != NULL);
173 char distributionMethod = LookupDistributionMethod(distributionMethodOid);
174
175 CreateDistributedTable(relationId, distributionColumn, distributionMethod,
176 ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
177
178 relation_close(relation, NoLock);
179
180 PG_RETURN_VOID();
181 }
182
183
184 /*
185 * create_distributed_table gets a table name, distribution column,
186 * distribution method and colocate_with option, then it creates a
187 * distributed table.
188 */
189 Datum
create_distributed_table(PG_FUNCTION_ARGS)190 create_distributed_table(PG_FUNCTION_ARGS)
191 {
192 CheckCitusVersion(ERROR);
193
194 if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
195 {
196 PG_RETURN_VOID();
197 }
198 bool viaDeprecatedAPI = false;
199
200 Oid relationId = PG_GETARG_OID(0);
201 text *distributionColumnText = PG_GETARG_TEXT_P(1);
202 Oid distributionMethodOid = PG_GETARG_OID(2);
203 text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
204 char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
205
206 bool shardCountIsStrict = false;
207 int shardCount = ShardCount;
208 if (!PG_ARGISNULL(4))
209 {
210 if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
211 pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
212 {
213 ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
214 "and shard_count at the same time")));
215 }
216
217 shardCount = PG_GETARG_INT32(4);
218
219 /*
220 * if shard_count parameter is given than we have to
221 * make sure table has that many shards
222 */
223 shardCountIsStrict = true;
224 }
225
226 EnsureCitusTableCanBeCreated(relationId);
227
228 /* enable create_distributed_table on an empty node */
229 InsertCoordinatorIfClusterEmpty();
230
231 /*
232 * Lock target relation with an exclusive lock - there's no way to make
233 * sense of this table until we've committed, and we don't want multiple
234 * backends manipulating this relation.
235 */
236 Relation relation = try_relation_open(relationId, ExclusiveLock);
237 if (relation == NULL)
238 {
239 ereport(ERROR, (errmsg("could not create distributed table: "
240 "relation does not exist")));
241 }
242
243 relation_close(relation, NoLock);
244
245 char *distributionColumnName = text_to_cstring(distributionColumnText);
246 Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
247 distributionColumnName);
248 Assert(distributionColumn != NULL);
249 char distributionMethod = LookupDistributionMethod(distributionMethodOid);
250
251 if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
252 {
253 ereport(ERROR, (errmsg("%d is outside the valid range for "
254 "parameter \"shard_count\" (1 .. %d)",
255 shardCount, MAX_SHARD_COUNT)));
256 }
257
258 CreateDistributedTable(relationId, distributionColumn, distributionMethod,
259 shardCount, shardCountIsStrict, colocateWithTableName,
260 viaDeprecatedAPI);
261
262 PG_RETURN_VOID();
263 }
264
265
266 /*
267 * CreateReferenceTable creates a distributed table with the given relationId. The
268 * created table has one shard and replication factor is set to the active worker
269 * count. In fact, the above is the definition of a reference table in Citus.
270 */
271 Datum
create_reference_table(PG_FUNCTION_ARGS)272 create_reference_table(PG_FUNCTION_ARGS)
273 {
274 CheckCitusVersion(ERROR);
275 Oid relationId = PG_GETARG_OID(0);
276
277 char *colocateWithTableName = NULL;
278 Var *distributionColumn = NULL;
279
280 bool viaDeprecatedAPI = false;
281
282 EnsureCitusTableCanBeCreated(relationId);
283
284 /* enable create_reference_table on an empty node */
285 InsertCoordinatorIfClusterEmpty();
286
287 /*
288 * Lock target relation with an exclusive lock - there's no way to make
289 * sense of this table until we've committed, and we don't want multiple
290 * backends manipulating this relation.
291 */
292 Relation relation = try_relation_open(relationId, ExclusiveLock);
293 if (relation == NULL)
294 {
295 ereport(ERROR, (errmsg("could not create reference table: "
296 "relation does not exist")));
297 }
298
299 relation_close(relation, NoLock);
300
301 List *workerNodeList = ActivePrimaryNodeList(ShareLock);
302 int workerCount = list_length(workerNodeList);
303
304 /* if there are no workers, error out */
305 if (workerCount == 0)
306 {
307 char *relationName = get_rel_name(relationId);
308
309 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
310 errmsg("cannot create reference table \"%s\"", relationName),
311 errdetail("There are no active worker nodes.")));
312 }
313
314 CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
315 ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
316 PG_RETURN_VOID();
317 }
318
319
320 /*
321 * EnsureCitusTableCanBeCreated checks if
322 * - we are on the coordinator
323 * - the current user is the owner of the table
324 * - relation kind is supported
325 */
326 static void
EnsureCitusTableCanBeCreated(Oid relationOid)327 EnsureCitusTableCanBeCreated(Oid relationOid)
328 {
329 EnsureCoordinator();
330 EnsureRelationExists(relationOid);
331 EnsureTableOwner(relationOid);
332
333 /*
334 * We should do this check here since the codes in the following lines rely
335 * on this relation to have a supported relation kind. More extensive checks
336 * will be performed in CreateDistributedTable.
337 */
338 EnsureRelationKindSupported(relationOid);
339 }
340
341
342 /*
343 * EnsureRelationExists does a basic check on whether the OID belongs to
344 * an existing relation.
345 */
346 void
EnsureRelationExists(Oid relationId)347 EnsureRelationExists(Oid relationId)
348 {
349 if (!RelationExists(relationId))
350 {
351 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
352 errmsg("relation with OID %d does not exist",
353 relationId)));
354 }
355 }
356
357
358 /*
359 * CreateDistributedTable creates distributed table in the given configuration.
360 * This functions contains all necessary logic to create distributed tables. It
361 * performs necessary checks to ensure distributing the table is safe. If it is
362 * safe to distribute the table, this function creates distributed table metadata,
363 * creates shards and copies local data to shards. This function also handles
364 * partitioned tables by distributing its partitions as well.
365 *
366 * viaDeprecatedAPI boolean flag is not optimal way to implement this function,
367 * but it helps reducing code duplication a lot. We hope to remove that flag one
368 * day, once we deprecate master_create_distribute_table completely.
369 */
370 void
CreateDistributedTable(Oid relationId,Var * distributionColumn,char distributionMethod,int shardCount,bool shardCountIsStrict,char * colocateWithTableName,bool viaDeprecatedAPI)371 CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
372 int shardCount, bool shardCountIsStrict,
373 char *colocateWithTableName, bool viaDeprecatedAPI)
374 {
375 /*
376 * EnsureTableNotDistributed errors out when relation is a citus table but
377 * we don't want to ask user to first undistribute their citus local tables
378 * when creating reference or distributed tables from them.
379 * For this reason, here we undistribute citus local tables beforehand.
380 * But since UndistributeTable does not support undistributing relations
381 * involved in foreign key relationships, we first drop foreign keys that
382 * given relation is involved, then we undistribute the relation and finally
383 * we re-create dropped foreign keys at the end of this function.
384 */
385 List *originalForeignKeyRecreationCommands = NIL;
386 if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
387 {
388 /* store foreign key creation commands that relation is involved */
389 originalForeignKeyRecreationCommands =
390 GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
391 INCLUDE_ALL_TABLE_TYPES);
392 relationId = DropFKeysAndUndistributeTable(relationId);
393 }
394
395 /*
396 * To support foreign keys between reference tables and local tables,
397 * we drop & re-define foreign keys at the end of this function so
398 * that ALTER TABLE hook does the necessary job, which means converting
399 * local tables to citus local tables to properly support such foreign
400 * keys.
401 *
402 * This function does not expect to create Citus local table, so we blindly
403 * create reference table when the method is DISTRIBUTE_BY_NONE.
404 */
405 else if (distributionMethod == DISTRIBUTE_BY_NONE &&
406 ShouldEnableLocalReferenceForeignKeys() &&
407 HasForeignKeyWithLocalTable(relationId))
408 {
409 /*
410 * Store foreign key creation commands for foreign key relationships
411 * that relation has with postgres tables.
412 */
413 originalForeignKeyRecreationCommands =
414 GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
415 INCLUDE_LOCAL_TABLES);
416
417 /*
418 * Soon we will convert local tables to citus local tables. As
419 * CreateCitusLocalTable needs to use local execution, now we
420 * switch to local execution beforehand so that reference table
421 * creation doesn't use remote execution and we don't error out
422 * in CreateCitusLocalTable.
423 */
424 SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
425
426 DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
427 }
428
429 /*
430 * distributed tables might have dependencies on different objects, since we create
431 * shards for a distributed table via multiple sessions these objects will be created
432 * via their own connection and committed immediately so they become visible to all
433 * sessions creating shards.
434 */
435 ObjectAddress tableAddress = { 0 };
436 ObjectAddressSet(tableAddress, RelationRelationId, relationId);
437 EnsureDependenciesExistOnAllNodes(&tableAddress);
438
439 char replicationModel = DecideReplicationModel(distributionMethod,
440 colocateWithTableName,
441 viaDeprecatedAPI);
442
443
444 /*
445 * Due to dropping columns, the parent's distribution key may not match the
446 * partition's distribution key. The input distributionColumn belongs to
447 * the parent. That's why we override the distribution column of partitions
448 * here. See issue #5123 for details.
449 */
450 if (PartitionTable(relationId))
451 {
452 Oid parentRelationId = PartitionParentOid(relationId);
453 char *distributionColumnName =
454 ColumnToColumnName(parentRelationId, nodeToString(distributionColumn));
455
456 distributionColumn =
457 FindColumnWithNameOnTargetRelation(parentRelationId, distributionColumnName,
458 relationId);
459 }
460
461 /*
462 * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
463 * our caller already acquired lock on relationId.
464 */
465 uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn,
466 distributionMethod, replicationModel,
467 shardCount, shardCountIsStrict,
468 colocateWithTableName,
469 viaDeprecatedAPI);
470
471 EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
472 colocationId, replicationModel, viaDeprecatedAPI);
473
474 /*
475 * Make sure that existing reference tables have been replicated to all the nodes
476 * such that we can create foreign keys and joins work immediately after creation.
477 */
478 EnsureReferenceTablesExistOnAllNodes();
479
480 /* we need to calculate these variables before creating distributed metadata */
481 bool localTableEmpty = TableEmpty(relationId);
482 Oid colocatedTableId = ColocatedTableId(colocationId);
483
484 /* create an entry for distributed table in pg_dist_partition */
485 InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
486 colocationId, replicationModel);
487
488 /*
489 * Ensure that the sequences used in column defaults of the table
490 * have proper types
491 */
492 List *attnumList = NIL;
493 List *dependentSequenceList = NIL;
494 GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
495 EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
496 attnumList);
497
498 /* foreign tables do not support TRUNCATE trigger */
499 if (RegularTable(relationId))
500 {
501 CreateTruncateTrigger(relationId);
502 }
503
504 /*
505 * If we are using master_create_distributed_table, we don't need to continue,
506 * because deprecated API does not supports the following features.
507 */
508 if (viaDeprecatedAPI)
509 {
510 Assert(colocateWithTableName == NULL);
511 return;
512 }
513
514 /* create shards for hash distributed and reference tables */
515 if (distributionMethod == DISTRIBUTE_BY_HASH)
516 {
517 CreateHashDistributedTableShards(relationId, shardCount, colocatedTableId,
518 localTableEmpty);
519 }
520 else if (distributionMethod == DISTRIBUTE_BY_NONE)
521 {
522 /*
523 * This function does not expect to create Citus local table, so we blindly
524 * create reference table when the method is DISTRIBUTE_BY_NONE.
525 */
526 CreateReferenceTableShard(relationId);
527 }
528
529 if (ShouldSyncTableMetadata(relationId))
530 {
531 if (ClusterHasKnownMetadataWorkers())
532 {
533 /*
534 * Ensure sequence dependencies and mark them as distributed
535 * before creating table metadata on workers
536 */
537 MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
538 }
539
540 CreateTableMetadataOnWorkers(relationId);
541 }
542
543 /*
544 * We've a custom way of foreign key graph invalidation,
545 * see InvalidateForeignKeyGraph().
546 */
547 if (TableReferenced(relationId) || TableReferencing(relationId))
548 {
549 InvalidateForeignKeyGraph();
550 }
551
552 /* if this table is partitioned table, distribute its partitions too */
553 if (PartitionedTable(relationId))
554 {
555 List *partitionList = PartitionList(relationId);
556 Oid partitionRelationId = InvalidOid;
557 Oid namespaceId = get_rel_namespace(relationId);
558 char *schemaName = get_namespace_name(namespaceId);
559 char *relationName = get_rel_name(relationId);
560 char *parentRelationName = quote_qualified_identifier(schemaName, relationName);
561
562 foreach_oid(partitionRelationId, partitionList)
563 {
564 CreateDistributedTable(partitionRelationId, distributionColumn,
565 distributionMethod, shardCount, false,
566 parentRelationName, viaDeprecatedAPI);
567 }
568 }
569
570 /* copy over data for hash distributed and reference tables */
571 if (distributionMethod == DISTRIBUTE_BY_HASH ||
572 distributionMethod == DISTRIBUTE_BY_NONE)
573 {
574 if (RegularTable(relationId))
575 {
576 CopyLocalDataIntoShards(relationId);
577 }
578 }
579
580 /*
581 * Now recreate foreign keys that we dropped beforehand. As modifications are not
582 * allowed on the relations that are involved in the foreign key relationship,
583 * we can skip the validation of the foreign keys.
584 */
585 bool skip_validation = true;
586 ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
587 skip_validation);
588 }
589
590
591 /*
592 * EnsureSequenceTypeSupported ensures that the type of the column that uses
593 * a sequence on its DEFAULT is consistent with previous uses (if any) of the
594 * sequence in distributed tables.
595 * If any other distributed table uses the input sequence, it checks whether
596 * the types of the columns using the sequence match. If they don't, it errors out.
597 * Otherwise, the condition is ensured.
598 */
599 void
EnsureSequenceTypeSupported(Oid seqOid,Oid seqTypId)600 EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId)
601 {
602 List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
603 Oid citusTableId = InvalidOid;
604 foreach_oid(citusTableId, citusTableIdList)
605 {
606 List *attnumList = NIL;
607 List *dependentSequenceList = NIL;
608 GetDependentSequencesWithRelation(citusTableId, &attnumList,
609 &dependentSequenceList, 0);
610 ListCell *attnumCell = NULL;
611 ListCell *dependentSequenceCell = NULL;
612 forboth(attnumCell, attnumList, dependentSequenceCell,
613 dependentSequenceList)
614 {
615 AttrNumber currentAttnum = lfirst_int(attnumCell);
616 Oid currentSeqOid = lfirst_oid(dependentSequenceCell);
617
618 /*
619 * If another distributed table is using the same sequence
620 * in one of its column defaults, make sure the types of the
621 * columns match
622 */
623 if (currentSeqOid == seqOid)
624 {
625 Oid currentSeqTypId = GetAttributeTypeOid(citusTableId,
626 currentAttnum);
627 if (seqTypId != currentSeqTypId)
628 {
629 char *sequenceName = generate_qualified_relation_name(
630 seqOid);
631 char *citusTableName =
632 generate_qualified_relation_name(citusTableId);
633 ereport(ERROR, (errmsg(
634 "The sequence %s is already used for a different"
635 " type in column %d of the table %s",
636 sequenceName, currentAttnum,
637 citusTableName)));
638 }
639 }
640 }
641 }
642 }
643
644
645 /*
646 * AlterSequenceType alters the given sequence's type to the given type.
647 */
648 void
AlterSequenceType(Oid seqOid,Oid typeOid)649 AlterSequenceType(Oid seqOid, Oid typeOid)
650 {
651 Form_pg_sequence sequenceData = pg_get_sequencedef(seqOid);
652 Oid currentSequenceTypeOid = sequenceData->seqtypid;
653 if (currentSequenceTypeOid != typeOid)
654 {
655 AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt);
656 char *seqNamespace = get_namespace_name(get_rel_namespace(seqOid));
657 char *seqName = get_rel_name(seqOid);
658 alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1);
659 Node *asTypeNode = (Node *) makeTypeNameFromOid(typeOid, -1);
660 SetDefElemArg(alterSequenceStatement, "as", asTypeNode);
661 ParseState *pstate = make_parsestate(NULL);
662 AlterSequence(pstate, alterSequenceStatement);
663 }
664 }
665
666
667 /*
668 * MarkSequenceListDistributedAndPropagateDependencies ensures dependencies
669 * for the given sequence list exist on all nodes and marks the sequences
670 * as distributed.
671 */
672 void
MarkSequenceListDistributedAndPropagateDependencies(List * sequenceList)673 MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList)
674 {
675 Oid sequenceOid = InvalidOid;
676 foreach_oid(sequenceOid, sequenceList)
677 {
678 MarkSequenceDistributedAndPropagateDependencies(sequenceOid);
679 }
680 }
681
682
683 /*
684 * MarkSequenceDistributedAndPropagateDependencies ensures dependencies
685 * for the given sequence exist on all nodes and marks the sequence
686 * as distributed.
687 */
688 void
MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid)689 MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid)
690 {
691 /* get sequence address */
692 ObjectAddress sequenceAddress = { 0 };
693 ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
694 EnsureDependenciesExistOnAllNodes(&sequenceAddress);
695 MarkObjectDistributed(&sequenceAddress);
696 }
697
698
699 /*
700 * EnsureDistributedSequencesHaveOneType first ensures that the type of the column
701 * in which the sequence is used as default is supported for each sequence in input
702 * dependentSequenceList, and then alters the sequence type if not the same with the column type.
703 */
704 void
EnsureDistributedSequencesHaveOneType(Oid relationId,List * dependentSequenceList,List * attnumList)705 EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceList,
706 List *attnumList)
707 {
708 ListCell *attnumCell = NULL;
709 ListCell *dependentSequenceCell = NULL;
710 forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList)
711 {
712 AttrNumber attnum = lfirst_int(attnumCell);
713 Oid sequenceOid = lfirst_oid(dependentSequenceCell);
714
715 /*
716 * We should make sure that the type of the column that uses
717 * that sequence is supported
718 */
719 Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
720 EnsureSequenceTypeSupported(sequenceOid, seqTypId);
721
722 /*
723 * Alter the sequence's data type in the coordinator if needed.
724 * A sequence's type is bigint by default and it doesn't change even if
725 * it's used in an int column. We should change the type if needed,
726 * and not allow future ALTER SEQUENCE ... TYPE ... commands for
727 * sequences used as defaults in distributed tables
728 */
729 AlterSequenceType(sequenceOid, seqTypId);
730 }
731 }
732
733
734 /*
735 * GetFKeyCreationCommandsRelationInvolvedWithTableType returns a list of DDL
736 * commands to recreate the foreign keys that relation with relationId is involved
737 * with given table type.
738 */
739 static List *
GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId,int tableTypeFlag)740 GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
741 {
742 int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
743 tableTypeFlag;
744 List *referencingFKeyCreationCommands =
745 GetForeignConstraintCommandsInternal(relationId, referencingFKeysFlag);
746
747 /* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
748 int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
749 EXCLUDE_SELF_REFERENCES |
750 tableTypeFlag;
751 List *referencedFKeyCreationCommands =
752 GetForeignConstraintCommandsInternal(relationId, referencedFKeysFlag);
753 return list_concat(referencingFKeyCreationCommands, referencedFKeyCreationCommands);
754 }
755
756
757 /*
758 * DropFKeysAndUndistributeTable drops all foreign keys that relation with
759 * relationId is involved then undistributes it.
760 * Note that as UndistributeTable changes relationId of relation, this
761 * function also returns new relationId of relation.
762 * Also note that callers are responsible for storing & recreating foreign
763 * keys to be dropped if needed.
764 */
765 static Oid
DropFKeysAndUndistributeTable(Oid relationId)766 DropFKeysAndUndistributeTable(Oid relationId)
767 {
768 DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES);
769
770 /* store them before calling UndistributeTable as it changes relationId */
771 char *relationName = get_rel_name(relationId);
772 Oid schemaId = get_rel_namespace(relationId);
773
774 /* suppress notices messages not to be too verbose */
775 TableConversionParameters params = {
776 .relationId = relationId,
777 .cascadeViaForeignKeys = false,
778 .suppressNoticeMessages = true
779 };
780 UndistributeTable(¶ms);
781
782 Oid newRelationId = get_relname_relid(relationName, schemaId);
783
784 /*
785 * We don't expect this to happen but to be on the safe side let's error
786 * out here.
787 */
788 EnsureRelationExists(newRelationId);
789
790 return newRelationId;
791 }
792
793
794 /*
795 * DropFKeysRelationInvolvedWithTableType drops foreign keys that relation
796 * with relationId is involved with given table type.
797 */
798 static void
DropFKeysRelationInvolvedWithTableType(Oid relationId,int tableTypeFlag)799 DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
800 {
801 int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
802 tableTypeFlag;
803 DropRelationForeignKeys(relationId, referencingFKeysFlag);
804
805 /* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
806 int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
807 EXCLUDE_SELF_REFERENCES |
808 tableTypeFlag;
809 DropRelationForeignKeys(relationId, referencedFKeysFlag);
810 }
811
812
813 /*
814 * DecideReplicationModel function decides which replication model should be
815 * used depending on given distribution configuration.
816 */
817 static char
DecideReplicationModel(char distributionMethod,char * colocateWithTableName,bool viaDeprecatedAPI)818 DecideReplicationModel(char distributionMethod, char *colocateWithTableName, bool
819 viaDeprecatedAPI)
820 {
821 if (viaDeprecatedAPI)
822 {
823 return REPLICATION_MODEL_COORDINATOR;
824 }
825 else if (distributionMethod == DISTRIBUTE_BY_NONE)
826 {
827 return REPLICATION_MODEL_2PC;
828 }
829 else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
830 !IsColocateWithNone(colocateWithTableName))
831 {
832 text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
833 Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false);
834 CitusTableCacheEntry *targetTableEntry = GetCitusTableCacheEntry(
835 colocatedRelationId);
836 char replicationModel = targetTableEntry->replicationModel;
837
838 return replicationModel;
839 }
840 else if (distributionMethod == DISTRIBUTE_BY_HASH &&
841 !DistributedTableReplicationIsEnabled())
842 {
843 return REPLICATION_MODEL_STREAMING;
844 }
845 else
846 {
847 return REPLICATION_MODEL_COORDINATOR;
848 }
849
850 /* we should not reach to this point */
851 return REPLICATION_MODEL_INVALID;
852 }
853
854
855 /*
856 * CreateHashDistributedTableShards creates shards of given hash distributed table.
857 */
858 static void
CreateHashDistributedTableShards(Oid relationId,int shardCount,Oid colocatedTableId,bool localTableEmpty)859 CreateHashDistributedTableShards(Oid relationId, int shardCount,
860 Oid colocatedTableId, bool localTableEmpty)
861 {
862 bool useExclusiveConnection = false;
863
864 /*
865 * Decide whether to use exclusive connections per placement or not. Note that
866 * if the local table is not empty, we cannot use sequential mode since the COPY
867 * operation that'd load the data into shards currently requires exclusive
868 * connections.
869 */
870 if (RegularTable(relationId))
871 {
872 useExclusiveConnection = CanUseExclusiveConnections(relationId,
873 localTableEmpty);
874 }
875
876 if (colocatedTableId != InvalidOid)
877 {
878 CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
879 }
880 else
881 {
882 /*
883 * This path is only reached by create_distributed_table for the distributed
884 * tables which will not be part of an existing colocation group. Therefore,
885 * we can directly use ShardReplicationFactor global variable here.
886 */
887 CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
888 useExclusiveConnection);
889 }
890 }
891
892
893 /*
894 * ColocationIdForNewTable returns a colocation id for hash-distributed table
895 * according to given configuration. If there is no such configuration, it
896 * creates one and returns colocation id of newly the created colocation group.
897 * For append and range distributed tables, this function errors out if
898 * colocateWithTableName parameter is not NULL, otherwise directly returns
899 * INVALID_COLOCATION_ID.
900 *
901 * This function assumes its caller take necessary lock on relationId to
902 * prevent possible changes on it.
903 */
904 static uint32
ColocationIdForNewTable(Oid relationId,Var * distributionColumn,char distributionMethod,char replicationModel,int shardCount,bool shardCountIsStrict,char * colocateWithTableName,bool viaDeprecatedAPI)905 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
906 char distributionMethod, char replicationModel,
907 int shardCount, bool shardCountIsStrict,
908 char *colocateWithTableName, bool viaDeprecatedAPI)
909 {
910 uint32 colocationId = INVALID_COLOCATION_ID;
911
912 if (viaDeprecatedAPI)
913 {
914 return colocationId;
915 }
916 else if (distributionMethod == DISTRIBUTE_BY_APPEND ||
917 distributionMethod == DISTRIBUTE_BY_RANGE)
918 {
919 if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
920 {
921 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
922 errmsg("cannot distribute relation"),
923 errdetail("Currently, colocate_with option is only supported "
924 "for hash distributed tables.")));
925 }
926
927 return colocationId;
928 }
929 else if (distributionMethod == DISTRIBUTE_BY_NONE)
930 {
931 return CreateReferenceTableColocationId();
932 }
933 else
934 {
935 /*
936 * Get an exclusive lock on the colocation system catalog. Therefore, we
937 * can be sure that there will no modifications on the colocation table
938 * until this transaction is committed.
939 */
940 Assert(distributionMethod == DISTRIBUTE_BY_HASH);
941
942 Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock);
943
944 Oid distributionColumnType = distributionColumn->vartype;
945 Oid distributionColumnCollation = get_typcollation(distributionColumnType);
946 bool createdColocationGroup = false;
947
948 if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
949 {
950 /* check for default colocation group */
951 colocationId = ColocationId(shardCount, ShardReplicationFactor,
952 distributionColumnType,
953 distributionColumnCollation);
954
955 /*
956 * if the shardCount is strict then we check if the shard count
957 * of the colocated table is actually shardCount
958 */
959 if (shardCountIsStrict && colocationId != INVALID_COLOCATION_ID)
960 {
961 Oid colocatedTableId = ColocatedTableId(colocationId);
962
963 if (colocatedTableId != InvalidOid)
964 {
965 CitusTableCacheEntry *cacheEntry =
966 GetCitusTableCacheEntry(colocatedTableId);
967 int colocatedTableShardCount = cacheEntry->shardIntervalArrayLength;
968
969 if (colocatedTableShardCount != shardCount)
970 {
971 colocationId = INVALID_COLOCATION_ID;
972 }
973 }
974 }
975
976 if (colocationId == INVALID_COLOCATION_ID)
977 {
978 colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,
979 distributionColumnType,
980 distributionColumnCollation);
981 createdColocationGroup = true;
982 }
983 }
984 else if (IsColocateWithNone(colocateWithTableName))
985 {
986 colocationId = GetNextColocationId();
987
988 createdColocationGroup = true;
989 }
990 else
991 {
992 text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
993 Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false);
994
995 EnsureTableCanBeColocatedWith(relationId, replicationModel,
996 distributionColumnType, sourceRelationId);
997
998 colocationId = TableColocationId(sourceRelationId);
999 }
1000
1001 /*
1002 * If we created a new colocation group then we need to keep the lock to
1003 * prevent a concurrent create_distributed_table call from creating another
1004 * colocation group with the same parameters. If we're using an existing
1005 * colocation group then other transactions will use the same one.
1006 */
1007 if (createdColocationGroup)
1008 {
1009 /* keep the exclusive lock */
1010 table_close(pgDistColocation, NoLock);
1011 }
1012 else
1013 {
1014 /* release the exclusive lock */
1015 table_close(pgDistColocation, ExclusiveLock);
1016 }
1017 }
1018
1019 return colocationId;
1020 }
1021
1022
1023 /*
1024 * EnsureRelationCanBeDistributed checks whether Citus can safely distribute given
1025 * relation with the given configuration. We perform almost all safety checks for
1026 * distributing table here. If there is an unsatisfied requirement, we error out
1027 * and do not distribute the table.
1028 *
1029 * This function assumes, callers have already acquired necessary locks to ensure
1030 * there will not be any change in the given relation.
1031 */
1032 static void
EnsureRelationCanBeDistributed(Oid relationId,Var * distributionColumn,char distributionMethod,uint32 colocationId,char replicationModel,bool viaDeprecatedAPI)1033 EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
1034 char distributionMethod, uint32 colocationId,
1035 char replicationModel, bool viaDeprecatedAPI)
1036 {
1037 Oid parentRelationId = InvalidOid;
1038
1039 EnsureTableNotDistributed(relationId);
1040 EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI);
1041 EnsureRelationHasNoTriggers(relationId);
1042
1043 /* we assume callers took necessary locks */
1044 Relation relation = relation_open(relationId, NoLock);
1045 TupleDesc relationDesc = RelationGetDescr(relation);
1046 char *relationName = RelationGetRelationName(relation);
1047
1048 ErrorIfTableIsACatalogTable(relation);
1049
1050 /* verify target relation does not use identity columns */
1051 if (RelationUsesIdentityColumns(relationDesc))
1052 {
1053 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1054 errmsg("cannot distribute relation: %s", relationName),
1055 errdetail("Distributed relations must not use GENERATED "
1056 "... AS IDENTITY.")));
1057 }
1058
1059 /* verify target relation is not distributed by a generated columns */
1060 if (distributionMethod != DISTRIBUTE_BY_NONE &&
1061 DistributionColumnUsesGeneratedStoredColumn(relationDesc, distributionColumn))
1062 {
1063 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1064 errmsg("cannot distribute relation: %s", relationName),
1065 errdetail("Distribution column must not use GENERATED ALWAYS "
1066 "AS (...) STORED.")));
1067 }
1068
1069 /* check for support function needed by specified partition method */
1070 if (distributionMethod == DISTRIBUTE_BY_HASH)
1071 {
1072 Oid hashSupportFunction = SupportFunctionForColumn(distributionColumn,
1073 HASH_AM_OID,
1074 HASHSTANDARD_PROC);
1075 if (hashSupportFunction == InvalidOid)
1076 {
1077 ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
1078 errmsg("could not identify a hash function for type %s",
1079 format_type_be(distributionColumn->vartype)),
1080 errdatatype(distributionColumn->vartype),
1081 errdetail("Partition column types must have a hash function "
1082 "defined to use hash partitioning.")));
1083 }
1084
1085 if (distributionColumn->varcollid != InvalidOid &&
1086 !get_collation_isdeterministic(distributionColumn->varcollid))
1087 {
1088 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1089 errmsg("Hash distributed partition columns may not use "
1090 "a non deterministic collation")));
1091 }
1092 }
1093 else if (distributionMethod == DISTRIBUTE_BY_RANGE)
1094 {
1095 Oid btreeSupportFunction = SupportFunctionForColumn(distributionColumn,
1096 BTREE_AM_OID, BTORDER_PROC);
1097 if (btreeSupportFunction == InvalidOid)
1098 {
1099 ereport(ERROR,
1100 (errcode(ERRCODE_UNDEFINED_FUNCTION),
1101 errmsg("could not identify a comparison function for type %s",
1102 format_type_be(distributionColumn->vartype)),
1103 errdatatype(distributionColumn->vartype),
1104 errdetail("Partition column types must have a comparison function "
1105 "defined to use range partitioning.")));
1106 }
1107 }
1108
1109 if (PartitionTable(relationId))
1110 {
1111 parentRelationId = PartitionParentOid(relationId);
1112 }
1113
1114 /* partitions cannot be distributed if their parent is not distributed */
1115 if (PartitionTable(relationId) && !IsCitusTable(parentRelationId))
1116 {
1117 char *parentRelationName = get_rel_name(parentRelationId);
1118
1119 ereport(ERROR, (errmsg("cannot distribute relation \"%s\" which is partition of "
1120 "\"%s\"", relationName, parentRelationName),
1121 errdetail("Citus does not support distributing partitions "
1122 "if their parent is not distributed table."),
1123 errhint("Distribute the partitioned table \"%s\" instead.",
1124 parentRelationName)));
1125 }
1126
1127 /*
1128 * These checks are mostly for partitioned tables not partitions because we prevent
1129 * distributing partitions directly in the above check. However, partitions can still
1130 * reach this point because, we call CreateDistributedTable for partitions if their
1131 * parent table is distributed.
1132 */
1133 if (PartitionedTable(relationId))
1134 {
1135 /* we cannot distribute partitioned tables with master_create_distributed_table */
1136 if (viaDeprecatedAPI)
1137 {
1138 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1139 errmsg("distributing partitioned tables in only supported "
1140 "with create_distributed_table UDF")));
1141 }
1142
1143 /* distributing partitioned tables in only supported for hash-distribution */
1144 if (distributionMethod != DISTRIBUTE_BY_HASH)
1145 {
1146 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1147 errmsg("distributing partitioned tables in only supported "
1148 "for hash-distributed tables")));
1149 }
1150
1151 /* we don't support distributing tables with multi-level partitioning */
1152 if (PartitionTable(relationId))
1153 {
1154 char *parentRelationName = get_rel_name(parentRelationId);
1155
1156 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1157 errmsg("distributing multi-level partitioned tables "
1158 "is not supported"),
1159 errdetail("Relation \"%s\" is partitioned table itself and "
1160 "it is also partition of relation \"%s\".",
1161 relationName, parentRelationName)));
1162 }
1163 }
1164
1165 ErrorIfUnsupportedConstraint(relation, distributionMethod, replicationModel,
1166 distributionColumn, colocationId);
1167
1168
1169 ErrorIfUnsupportedPolicy(relation);
1170 relation_close(relation, NoLock);
1171 }
1172
1173
1174 /*
1175 * ErrorIfTableIsACatalogTable is a helper function to error out for citus
1176 * table creation from a catalog table.
1177 */
1178 void
ErrorIfTableIsACatalogTable(Relation relation)1179 ErrorIfTableIsACatalogTable(Relation relation)
1180 {
1181 if (relation->rd_rel->relnamespace != PG_CATALOG_NAMESPACE)
1182 {
1183 return;
1184 }
1185
1186 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1187 errmsg("cannot create a citus table from a catalog table")));
1188 }
1189
1190
1191 /*
1192 * EnsureTableCanBeColocatedWith checks whether a given replication model and
1193 * distribution column type is suitable to distribute a table to be colocated
1194 * with given source table.
1195 *
1196 * We only pass relationId to provide meaningful error messages.
1197 */
1198 static void
EnsureTableCanBeColocatedWith(Oid relationId,char replicationModel,Oid distributionColumnType,Oid sourceRelationId)1199 EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
1200 Oid distributionColumnType, Oid sourceRelationId)
1201 {
1202 CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
1203 char sourceReplicationModel = sourceTableEntry->replicationModel;
1204 Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
1205
1206 if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
1207 {
1208 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1209 errmsg("cannot distribute relation"),
1210 errdetail("Currently, colocate_with option is only supported "
1211 "for hash distributed tables.")));
1212 }
1213
1214 if (sourceReplicationModel != replicationModel)
1215 {
1216 char *relationName = get_rel_name(relationId);
1217 char *sourceRelationName = get_rel_name(sourceRelationId);
1218
1219 ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
1220 sourceRelationName, relationName),
1221 errdetail("Replication models don't match for %s and %s.",
1222 sourceRelationName, relationName)));
1223 }
1224
1225 Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
1226 if (sourceDistributionColumnType != distributionColumnType)
1227 {
1228 char *relationName = get_rel_name(relationId);
1229 char *sourceRelationName = get_rel_name(sourceRelationId);
1230
1231 ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
1232 sourceRelationName, relationName),
1233 errdetail("Distribution column types don't match for "
1234 "%s and %s.", sourceRelationName,
1235 relationName)));
1236 }
1237 }
1238
1239
1240 /*
1241 * EnsureLocalTableEmptyIfNecessary errors out if the function should be empty
1242 * according to ShouldLocalTableBeEmpty but it is not.
1243 */
1244 static void
EnsureLocalTableEmptyIfNecessary(Oid relationId,char distributionMethod,bool viaDeprecatedAPI)1245 EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod,
1246 bool viaDeprecatedAPI)
1247 {
1248 if (ShouldLocalTableBeEmpty(relationId, distributionMethod, viaDeprecatedAPI))
1249 {
1250 EnsureLocalTableEmpty(relationId);
1251 }
1252 }
1253
1254
1255 /*
1256 * ShouldLocalTableBeEmpty returns true if the local table should be empty
1257 * before creating a citus table.
1258 * In some cases, it is possible and safe to send local data to shards while
1259 * distributing the table. In those cases, we can distribute non-empty local
1260 * tables. This function checks the distributionMethod and relation kind to
1261 * see whether we need to be ensure emptiness of local table.
1262 */
1263 static bool
ShouldLocalTableBeEmpty(Oid relationId,char distributionMethod,bool viaDeprecatedAPI)1264 ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod,
1265 bool viaDeprecatedAPI)
1266 {
1267 bool shouldLocalTableBeEmpty = false;
1268 if (viaDeprecatedAPI)
1269 {
1270 /* we don't support copying local data via deprecated API */
1271 shouldLocalTableBeEmpty = true;
1272 }
1273 else if (distributionMethod != DISTRIBUTE_BY_HASH &&
1274 distributionMethod != DISTRIBUTE_BY_NONE)
1275 {
1276 /*
1277 * We only support hash distributed tables and reference tables
1278 * for initial data loading
1279 */
1280 shouldLocalTableBeEmpty = true;
1281 }
1282 else if (!RegularTable(relationId))
1283 {
1284 /*
1285 * We only support tables and partitioned tables for initial
1286 * data loading
1287 */
1288 shouldLocalTableBeEmpty = true;
1289 }
1290
1291 return shouldLocalTableBeEmpty;
1292 }
1293
1294
1295 /*
1296 * EnsureLocalTableEmpty errors out if the local table is not empty.
1297 */
1298 static void
EnsureLocalTableEmpty(Oid relationId)1299 EnsureLocalTableEmpty(Oid relationId)
1300 {
1301 char *relationName = get_rel_name(relationId);
1302 bool localTableEmpty = TableEmpty(relationId);
1303
1304 if (!localTableEmpty)
1305 {
1306 ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1307 errmsg("cannot distribute relation \"%s\"", relationName),
1308 errdetail("Relation \"%s\" contains data.", relationName),
1309 errhint("Empty your table before distributing it.")));
1310 }
1311 }
1312
1313
1314 /*
1315 * EnsureTableNotDistributed errors out if the table is distributed.
1316 */
1317 void
EnsureTableNotDistributed(Oid relationId)1318 EnsureTableNotDistributed(Oid relationId)
1319 {
1320 char *relationName = get_rel_name(relationId);
1321
1322 bool isCitusTable = IsCitusTable(relationId);
1323
1324 if (isCitusTable)
1325 {
1326 ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1327 errmsg("table \"%s\" is already distributed",
1328 relationName)));
1329 }
1330 }
1331
1332
1333 /*
1334 * EnsureRelationHasNoTriggers errors out if the given table has triggers on
1335 * it. See also GetExplicitTriggerIdList function's comment for the triggers this
1336 * function errors out.
1337 */
1338 static void
EnsureRelationHasNoTriggers(Oid relationId)1339 EnsureRelationHasNoTriggers(Oid relationId)
1340 {
1341 List *explicitTriggerIds = GetExplicitTriggerIdList(relationId);
1342
1343 if (list_length(explicitTriggerIds) > 0)
1344 {
1345 char *relationName = get_rel_name(relationId);
1346
1347 Assert(relationName != NULL);
1348 ereport(ERROR, (errmsg("cannot distribute relation \"%s\" because it has "
1349 "triggers ", relationName),
1350 errdetail("Citus does not support distributing tables with "
1351 "triggers."),
1352 errhint("Drop all the triggers on \"%s\" and retry.",
1353 relationName)));
1354 }
1355 }
1356
1357
1358 /*
1359 * LookupDistributionMethod maps the oids of citus.distribution_type enum
1360 * values to pg_dist_partition.partmethod values.
1361 *
1362 * The passed in oid has to belong to a value of citus.distribution_type.
1363 */
1364 char
LookupDistributionMethod(Oid distributionMethodOid)1365 LookupDistributionMethod(Oid distributionMethodOid)
1366 {
1367 char distributionMethod = 0;
1368
1369 HeapTuple enumTuple = SearchSysCache1(ENUMOID, ObjectIdGetDatum(
1370 distributionMethodOid));
1371 if (!HeapTupleIsValid(enumTuple))
1372 {
1373 ereport(ERROR, (errmsg("invalid internal value for enum: %u",
1374 distributionMethodOid)));
1375 }
1376
1377 Form_pg_enum enumForm = (Form_pg_enum) GETSTRUCT(enumTuple);
1378 const char *enumLabel = NameStr(enumForm->enumlabel);
1379
1380 if (strncmp(enumLabel, "append", NAMEDATALEN) == 0)
1381 {
1382 distributionMethod = DISTRIBUTE_BY_APPEND;
1383 }
1384 else if (strncmp(enumLabel, "hash", NAMEDATALEN) == 0)
1385 {
1386 distributionMethod = DISTRIBUTE_BY_HASH;
1387 }
1388 else if (strncmp(enumLabel, "range", NAMEDATALEN) == 0)
1389 {
1390 distributionMethod = DISTRIBUTE_BY_RANGE;
1391 }
1392 else
1393 {
1394 ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
1395 }
1396
1397 ReleaseSysCache(enumTuple);
1398
1399 return distributionMethod;
1400 }
1401
1402
1403 /*
1404 * SupportFunctionForColumn locates a support function given a column, an access method,
1405 * and and id of a support function. This function returns InvalidOid if there is no
1406 * support function for the operator class family of the column, but if the data type
1407 * of the column has no default operator class whatsoever, this function errors out.
1408 */
1409 static Oid
SupportFunctionForColumn(Var * partitionColumn,Oid accessMethodId,int16 supportFunctionNumber)1410 SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
1411 int16 supportFunctionNumber)
1412 {
1413 Oid columnOid = partitionColumn->vartype;
1414 Oid operatorClassId = GetDefaultOpClass(columnOid, accessMethodId);
1415
1416 /* currently only support using the default operator class */
1417 if (operatorClassId == InvalidOid)
1418 {
1419 ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
1420 errmsg("data type %s has no default operator class for specified"
1421 " partition method", format_type_be(columnOid)),
1422 errdatatype(columnOid),
1423 errdetail("Partition column types must have a default operator"
1424 " class defined.")));
1425 }
1426
1427 Oid operatorFamilyId = get_opclass_family(operatorClassId);
1428 Oid operatorClassInputType = get_opclass_input_type(operatorClassId);
1429 Oid supportFunctionOid = get_opfamily_proc(operatorFamilyId, operatorClassInputType,
1430 operatorClassInputType,
1431 supportFunctionNumber);
1432
1433 return supportFunctionOid;
1434 }
1435
1436
1437 /*
1438 * TableEmpty function checks whether given table contains any row and
1439 * returns false if there is any data.
1440 */
1441 bool
TableEmpty(Oid tableId)1442 TableEmpty(Oid tableId)
1443 {
1444 Oid schemaId = get_rel_namespace(tableId);
1445 char *schemaName = get_namespace_name(schemaId);
1446 char *tableName = get_rel_name(tableId);
1447 char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName);
1448
1449 StringInfo selectTrueQueryString = makeStringInfo();
1450
1451 bool readOnly = true;
1452
1453 int spiConnectionResult = SPI_connect();
1454 if (spiConnectionResult != SPI_OK_CONNECT)
1455 {
1456 ereport(ERROR, (errmsg("could not connect to SPI manager")));
1457 }
1458
1459 appendStringInfo(selectTrueQueryString, SELECT_TRUE_QUERY, tableQualifiedName);
1460
1461 int spiQueryResult = SPI_execute(selectTrueQueryString->data, readOnly, 0);
1462 if (spiQueryResult != SPI_OK_SELECT)
1463 {
1464 ereport(ERROR, (errmsg("execution was not successful \"%s\"",
1465 selectTrueQueryString->data)));
1466 }
1467
1468 /* we expect that SELECT TRUE query will return single value in a single row OR empty set */
1469 Assert(SPI_processed == 1 || SPI_processed == 0);
1470
1471 bool localTableEmpty = !SPI_processed;
1472
1473 SPI_finish();
1474
1475 return localTableEmpty;
1476 }
1477
1478
1479 /*
1480 * CanUseExclusiveConnections checks if we can open parallel connections
1481 * while creating shards. We simply error out if we need to execute
1482 * sequentially but there is data in the table, since we cannot copy the
1483 * data to shards sequentially.
1484 */
1485 static bool
CanUseExclusiveConnections(Oid relationId,bool localTableEmpty)1486 CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
1487 {
1488 bool hasForeignKeyToReferenceTable = HasForeignKeyToReferenceTable(relationId);
1489 bool shouldRunSequential = MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
1490 hasForeignKeyToReferenceTable;
1491
1492 if (!localTableEmpty && shouldRunSequential)
1493 {
1494 char *relationName = get_rel_name(relationId);
1495
1496 ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode "
1497 "because it is not empty", relationName),
1498 errhint("If you have manually set "
1499 "citus.multi_shard_modify_mode to 'sequential', "
1500 "try with 'parallel' option. If that is not the "
1501 "case, try distributing local tables when they "
1502 "are empty.")));
1503 }
1504 else if (shouldRunSequential && ParallelQueryExecutedInTransaction())
1505 {
1506 /*
1507 * We decided to use sequential execution. It's either because relation
1508 * has a pre-existing foreign key to a reference table or because we
1509 * decided to use sequential execution due to a query executed in the
1510 * current xact beforehand.
1511 * We have specific error messages for either cases.
1512 */
1513
1514 char *relationName = get_rel_name(relationId);
1515
1516 if (hasForeignKeyToReferenceTable)
1517 {
1518 /*
1519 * If there has already been a parallel query executed, the sequential mode
1520 * would still use the already opened parallel connections to the workers,
1521 * thus contradicting our purpose of using sequential mode.
1522 */
1523 ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this "
1524 "transaction because it has a foreign key to "
1525 "a reference table", relationName),
1526 errdetail("If a hash distributed table has a foreign key "
1527 "to a reference table, it has to be created "
1528 "in sequential mode before any parallel commands "
1529 "have been executed in the same transaction"),
1530 errhint("Try re-running the transaction with "
1531 "\"SET LOCAL citus.multi_shard_modify_mode TO "
1532 "\'sequential\';\"")));
1533 }
1534 else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
1535 {
1536 ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode because "
1537 "a parallel query was executed in this transaction",
1538 relationName),
1539 errhint("If you have manually set "
1540 "citus.multi_shard_modify_mode to 'sequential', "
1541 "try with 'parallel' option. ")));
1542 }
1543 }
1544 else if (shouldRunSequential)
1545 {
1546 return false;
1547 }
1548 else if (!localTableEmpty || IsMultiStatementTransaction())
1549 {
1550 return true;
1551 }
1552
1553 return false;
1554 }
1555
1556
1557 /*
1558 * CreateTruncateTrigger creates a truncate trigger on table identified by relationId
1559 * and assigns citus_truncate_trigger() as handler.
1560 */
1561 void
CreateTruncateTrigger(Oid relationId)1562 CreateTruncateTrigger(Oid relationId)
1563 {
1564 StringInfo triggerName = makeStringInfo();
1565 bool internal = true;
1566
1567 appendStringInfo(triggerName, "truncate_trigger");
1568
1569 CreateTrigStmt *trigger = makeNode(CreateTrigStmt);
1570 trigger->trigname = triggerName->data;
1571 trigger->relation = NULL;
1572 trigger->funcname = SystemFuncName(CITUS_TRUNCATE_TRIGGER_NAME);
1573 trigger->args = NIL;
1574 trigger->row = false;
1575 trigger->timing = TRIGGER_TYPE_AFTER;
1576 trigger->events = TRIGGER_TYPE_TRUNCATE;
1577 trigger->columns = NIL;
1578 trigger->whenClause = NULL;
1579 trigger->isconstraint = false;
1580
1581 CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid,
1582 InvalidOid, InvalidOid, NULL,
1583 internal, false);
1584 }
1585
1586
1587 /*
1588 * RegularTable function returns true if given table's relation kind is RELKIND_RELATION
1589 * or RELKIND_PARTITIONED_TABLE otherwise it returns false.
1590 */
1591 bool
RegularTable(Oid relationId)1592 RegularTable(Oid relationId)
1593 {
1594 char relationKind = get_rel_relkind(relationId);
1595
1596 if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE)
1597 {
1598 return true;
1599 }
1600
1601 return false;
1602 }
1603
1604
1605 /*
1606 * CopyLocalDataIntoShards copies data from the local table, which is hidden
1607 * after converting it to a distributed table, into the shards of the distributed
1608 * table. For partitioned tables, this functions returns without copying the data
1609 * because we call this function for both partitioned tables and its partitions.
1610 * Returning early saves us from copying data to workers twice.
1611 *
1612 * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
1613 * We cannot use a regular COPY here since that cannot read from a table. Instead
1614 * we read from the table and pass each tuple to the CitusCopyDestReceiver which
1615 * opens a connection and starts a COPY for each shard placement that will have
1616 * data.
1617 *
1618 * We could call the planner and executor here and send the output to the
1619 * DestReceiver, but we are in a tricky spot here since Citus is already
1620 * intercepting queries on this table in the planner and executor hooks and we
1621 * want to read from the local table. To keep it simple, we perform a heap scan
1622 * directly on the table.
1623 *
1624 * Any writes on the table that are started during this operation will be handled
1625 * as distributed queries once the current transaction commits. SELECTs will
1626 * continue to read from the local table until the current transaction commits,
1627 * after which new SELECTs will be handled as distributed queries.
1628 *
1629 * After copying local data into the distributed table, the local data remains
1630 * in place and should be truncated at a later time.
1631 */
1632 static void
CopyLocalDataIntoShards(Oid distributedRelationId)1633 CopyLocalDataIntoShards(Oid distributedRelationId)
1634 {
1635 /* take an ExclusiveLock to block all operations except SELECT */
1636 Relation distributedRelation = table_open(distributedRelationId, ExclusiveLock);
1637
1638 /*
1639 * Skip copying from partitioned tables, we will copy the data from
1640 * partition to partition's shards.
1641 */
1642 if (PartitionedTable(distributedRelationId))
1643 {
1644 table_close(distributedRelation, NoLock);
1645
1646 return;
1647 }
1648
1649 /*
1650 * All writes have finished, make sure that we can see them by using the
1651 * latest snapshot. We use GetLatestSnapshot instead of
1652 * GetTransactionSnapshot since the latter would not reveal all writes
1653 * in serializable or repeatable read mode. Note that subsequent reads
1654 * from the distributed table would reveal those writes, temporarily
1655 * violating the isolation level. However, this seems preferable over
1656 * dropping the writes entirely.
1657 */
1658 PushActiveSnapshot(GetLatestSnapshot());
1659
1660 /* get the table columns */
1661 TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
1662 TupleTableSlot *slot = CreateTableSlotForRel(distributedRelation);
1663 List *columnNameList = TupleDescColumnNameList(tupleDescriptor);
1664
1665 int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
1666
1667 /* determine the partition column in the tuple descriptor */
1668 Var *partitionColumn = PartitionColumn(distributedRelationId, 0);
1669 if (partitionColumn != NULL)
1670 {
1671 partitionColumnIndex = partitionColumn->varattno - 1;
1672 }
1673
1674 /* initialise per-tuple memory context */
1675 EState *estate = CreateExecutorState();
1676 ExprContext *econtext = GetPerTupleExprContext(estate);
1677 econtext->ecxt_scantuple = slot;
1678
1679 bool stopOnFailure = true;
1680 DestReceiver *copyDest =
1681 (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
1682 columnNameList,
1683 partitionColumnIndex,
1684 estate, stopOnFailure,
1685 NULL);
1686
1687 /* initialise state for writing to shards, we'll open connections on demand */
1688 copyDest->rStartup(copyDest, 0, tupleDescriptor);
1689
1690 DoCopyFromLocalTableIntoShards(distributedRelation, copyDest, slot, estate);
1691
1692 /* finish writing into the shards */
1693 copyDest->rShutdown(copyDest);
1694 copyDest->rDestroy(copyDest);
1695
1696 /* free memory and close the relation */
1697 ExecDropSingleTupleTableSlot(slot);
1698 FreeExecutorState(estate);
1699 table_close(distributedRelation, NoLock);
1700
1701 PopActiveSnapshot();
1702 }
1703
1704
1705 /*
1706 * DoCopyFromLocalTableIntoShards performs a copy operation
1707 * from local tables into shards.
1708 */
1709 static void
DoCopyFromLocalTableIntoShards(Relation distributedRelation,DestReceiver * copyDest,TupleTableSlot * slot,EState * estate)1710 DoCopyFromLocalTableIntoShards(Relation distributedRelation,
1711 DestReceiver *copyDest,
1712 TupleTableSlot *slot,
1713 EState *estate)
1714 {
1715 /* begin reading from local table */
1716 TableScanDesc scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0,
1717 NULL);
1718
1719 MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1720
1721 uint64 rowsCopied = 0;
1722 while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
1723 {
1724 /* send tuple it to a shard */
1725 copyDest->receiveSlot(slot, copyDest);
1726
1727 /* clear tuple memory */
1728 ResetPerTupleExprContext(estate);
1729
1730 /* make sure we roll back on cancellation */
1731 CHECK_FOR_INTERRUPTS();
1732
1733 if (rowsCopied == 0)
1734 {
1735 ereport(NOTICE, (errmsg("Copying data from local table...")));
1736 }
1737
1738 rowsCopied++;
1739
1740 if (rowsCopied % LOG_PER_TUPLE_AMOUNT == 0)
1741 {
1742 ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
1743 }
1744 }
1745
1746 if (rowsCopied % LOG_PER_TUPLE_AMOUNT != 0)
1747 {
1748 ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
1749 }
1750
1751 if (rowsCopied > 0)
1752 {
1753 char *qualifiedRelationName =
1754 generate_qualified_relation_name(RelationGetRelid(distributedRelation));
1755 ereport(NOTICE, (errmsg("copying the data has completed"),
1756 errdetail("The local data in the table is no longer visible, "
1757 "but is still on disk."),
1758 errhint("To remove the local data, run: SELECT "
1759 "truncate_local_data_after_distributing_table($$%s$$)",
1760 qualifiedRelationName)));
1761 }
1762
1763 MemoryContextSwitchTo(oldContext);
1764
1765 /* finish reading from the local table */
1766 table_endscan(scan);
1767 }
1768
1769
1770 /*
1771 * TupleDescColumnNameList returns a list of column names for the given tuple
1772 * descriptor as plain strings.
1773 */
1774 static List *
TupleDescColumnNameList(TupleDesc tupleDescriptor)1775 TupleDescColumnNameList(TupleDesc tupleDescriptor)
1776 {
1777 List *columnNameList = NIL;
1778
1779 for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++)
1780 {
1781 Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex);
1782 char *columnName = NameStr(currentColumn->attname);
1783
1784 if (currentColumn->attisdropped ||
1785 currentColumn->attgenerated == ATTRIBUTE_GENERATED_STORED
1786 )
1787 {
1788 continue;
1789 }
1790
1791 columnNameList = lappend(columnNameList, columnName);
1792 }
1793
1794 return columnNameList;
1795 }
1796
1797
1798 /*
1799 * RelationUsesIdentityColumns returns whether a given relation uses
1800 * GENERATED ... AS IDENTITY
1801 */
1802 bool
RelationUsesIdentityColumns(TupleDesc relationDesc)1803 RelationUsesIdentityColumns(TupleDesc relationDesc)
1804 {
1805 for (int attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++)
1806 {
1807 Form_pg_attribute attributeForm = TupleDescAttr(relationDesc, attributeIndex);
1808
1809 if (attributeForm->attidentity != '\0')
1810 {
1811 return true;
1812 }
1813 }
1814
1815 return false;
1816 }
1817
1818
1819 /*
1820 * DistributionColumnUsesGeneratedStoredColumn returns whether a given relation uses
1821 * GENERATED ALWAYS AS (...) STORED on distribution column
1822 */
1823 static bool
DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,Var * distributionColumn)1824 DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
1825 Var *distributionColumn)
1826 {
1827 Form_pg_attribute attributeForm = TupleDescAttr(relationDesc,
1828 distributionColumn->varattno - 1);
1829
1830 if (attributeForm->attgenerated == ATTRIBUTE_GENERATED_STORED)
1831 {
1832 return true;
1833 }
1834
1835 return false;
1836 }
1837