1 /*
2 * multi_partitioning_utils.c
3 * Utility functions for declarative partitioning
4 *
5 * Copyright (c) Citus Data, Inc.
6 */
7 #include "postgres.h"
8
9 #include "distributed/pg_version_constants.h"
10
11 #include "access/genam.h"
12 #include "access/heapam.h"
13 #include "access/htup_details.h"
14 #include "catalog/index.h"
15 #include "catalog/indexing.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_constraint.h"
19 #include "catalog/pg_inherits.h"
20 #include "commands/tablecmds.h"
21 #include "common/string.h"
22 #include "distributed/citus_nodes.h"
23 #include "distributed/adaptive_executor.h"
24 #include "distributed/citus_ruleutils.h"
25 #include "distributed/colocation_utils.h"
26 #include "distributed/commands.h"
27 #include "distributed/coordinator_protocol.h"
28 #include "distributed/deparse_shard_query.h"
29 #include "distributed/listutils.h"
30 #include "distributed/metadata_utility.h"
31 #include "distributed/multi_executor.h"
32 #include "distributed/multi_partitioning_utils.h"
33 #include "distributed/multi_physical_planner.h"
34 #include "distributed/relay_utility.h"
35 #include "distributed/resource_lock.h"
36 #include "distributed/shardinterval_utils.h"
37 #include "distributed/version_compat.h"
38 #include "distributed/worker_protocol.h"
39 #include "lib/stringinfo.h"
40 #include "nodes/makefuncs.h"
41 #include "nodes/pg_list.h"
42 #include "pgstat.h"
43 #include "partitioning/partdesc.h"
44 #include "utils/builtins.h"
45 #include "utils/fmgroids.h"
46 #include "utils/lsyscache.h"
47 #include "utils/rel.h"
48 #include "utils/syscache.h"
49 #include "utils/varlena.h"
50
51 static char * PartitionBound(Oid partitionId);
52 static Relation try_relation_open_nolock(Oid relationId);
53 static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
54 static List * WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
55 List *checkConstraintList);
56 static List * CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId);
57 static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
58 List *indexIdList);
59 static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
60 char *qualifiedParentShardIndexName, Oid parentIndexId);
61 static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid
62 partitionIndexId,
63 char *
64 qualifiedParentShardIndexName);
65 static List * CheckConstraintNameListForRelation(Oid relationId);
66 static bool RelationHasConstraint(Oid relationId, char *constraintName);
67 static char * RenameConstraintCommand(Oid relationId, char *constraintName,
68 char *newConstraintName);
69
70
71 PG_FUNCTION_INFO_V1(fix_pre_citus10_partitioned_table_constraint_names);
72 PG_FUNCTION_INFO_V1(worker_fix_pre_citus10_partitioned_table_constraint_names);
73 PG_FUNCTION_INFO_V1(fix_partition_shard_index_names);
74 PG_FUNCTION_INFO_V1(worker_fix_partition_shard_index_names);
75
76
77 /*
78 * fix_pre_citus10_partitioned_table_constraint_names fixes the constraint names of
79 * partitioned table shards on workers.
80 *
81 * Constraint names for partitioned table shards should have shardId suffixes if and only
82 * if they are unique or foreign key constraints. We mistakenly appended shardIds to
83 * constraint names on ALTER TABLE dist_part_table ADD CONSTRAINT .. queries prior to
84 * Citus 10. fix_pre_citus10_partitioned_table_constraint_names determines if this is the
85 * case, and renames constraints back to their original names on shards.
86 */
87 Datum
fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)88 fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)
89 {
90 Oid relationId = PG_GETARG_OID(0);
91 EnsureCoordinator();
92
93 if (!PartitionedTable(relationId))
94 {
95 ereport(ERROR, (errmsg("could not fix partition constraints: "
96 "relation does not exist or is not partitioned")));
97 }
98 if (!IsCitusTable(relationId))
99 {
100 ereport(ERROR, (errmsg("fix_pre_citus10_partitioned_table_constraint_names can "
101 "only be called for distributed partitioned tables")));
102 }
103
104 List *taskList = CreateFixPartitionConstraintsTaskList(relationId);
105
106 /* do not do anything if there are no constraints that should be fixed */
107 if (taskList != NIL)
108 {
109 bool localExecutionSupported = true;
110 ExecuteUtilityTaskList(taskList, localExecutionSupported);
111 }
112
113 PG_RETURN_VOID();
114 }
115
116
117 /*
118 * worker_fix_pre_citus10_partitioned_table_constraint_names fixes the constraint names on a worker given a shell
119 * table name and shard id.
120 */
121 Datum
worker_fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)122 worker_fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)
123 {
124 Oid relationId = PG_GETARG_OID(0);
125 int64 shardId = PG_GETARG_INT32(1);
126 text *constraintNameText = PG_GETARG_TEXT_P(2);
127
128 if (!PartitionedTable(relationId))
129 {
130 ereport(ERROR, (errmsg("could not fix partition constraints: "
131 "relation does not exist or is not partitioned")));
132 }
133
134 char *constraintName = text_to_cstring(constraintNameText);
135 char *shardIdAppendedConstraintName = pstrdup(constraintName);
136 AppendShardIdToName(&shardIdAppendedConstraintName, shardId);
137
138 /* if shardId was appended to the constraint name, rename back to original */
139 if (RelationHasConstraint(relationId, shardIdAppendedConstraintName))
140 {
141 char *renameConstraintDDLCommand =
142 RenameConstraintCommand(relationId, shardIdAppendedConstraintName,
143 constraintName);
144 ExecuteAndLogUtilityCommand(renameConstraintDDLCommand);
145 }
146 PG_RETURN_VOID();
147 }
148
149
150 /*
151 * fix_partition_shard_index_names fixes the index names of shards of partitions of
152 * partitioned tables on workers.
153 *
154 * When running CREATE INDEX on parent_table, we didn't explicitly create the index on
155 * each partition as well. Postgres created indexes for partitions in the coordinator,
156 * and also in the workers. Actually, Postgres auto-generates index names when auto-creating
157 * indexes on each partition shard of the parent shards. If index name is too long, it
158 * truncates the name and adds _idx postfix to it. However, when truncating the name, the
159 * shardId of the partition shard can be lost. This may result in the same index name used for
160 * the partition shell table and one of the partition shards.
161 * For more details, check issue #4962 https://github.com/citusdata/citus/issues/4962
162 *
163 * fix_partition_shard_index_names renames indexes of shards of partition tables to include
164 * the shardId at the end of the name, regardless of whether index name was long or short
165 * As a result there will be no index name ending in _idx, rather all will end in _{shardid}
166 * Algorithm is:
167 * foreach parentShard in shardListOfParentTableId:
168 * foreach parentIndex on parent:
169 * generate qualifiedParentShardIndexName -> parentShardIndex
170 * foreach inheritedPartitionIndex on parentIndex:
171 * get table relation of inheritedPartitionIndex -> partitionId
172 * foreach partitionShard in shardListOfPartitionid:
173 * generate qualifiedPartitionShardName -> partitionShard
174 * generate newPartitionShardIndexName
175 * (the following happens in the worker node)
176 * foreach inheritedPartitionShardIndex on parentShardIndex:
177 * if table relation of inheritedPartitionShardIndex is partitionShard:
178 * if inheritedPartitionShardIndex does not have proper name:
179 * Rename(inheritedPartitionShardIndex, newPartitionShardIndexName)
180 * break
181 */
182 Datum
fix_partition_shard_index_names(PG_FUNCTION_ARGS)183 fix_partition_shard_index_names(PG_FUNCTION_ARGS)
184 {
185 CheckCitusVersion(ERROR);
186 EnsureCoordinator();
187
188 Oid relationId = PG_GETARG_OID(0);
189
190 Relation relation = try_relation_open(relationId, AccessExclusiveLock);
191
192 if (relation == NULL)
193 {
194 ereport(NOTICE, (errmsg("relation with OID %u does not exist, skipping",
195 relationId)));
196 PG_RETURN_VOID();
197 }
198
199 if (relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
200 {
201 relation_close(relation, NoLock);
202 ereport(ERROR, (errmsg(
203 "Fixing shard index names is only applicable to partitioned"
204 " tables, and \"%s\" is not a partitioned table",
205 RelationGetRelationName(relation))));
206 }
207
208 if (!IsCitusTable(relationId))
209 {
210 relation_close(relation, NoLock);
211 ereport(ERROR, (errmsg("fix_partition_shard_index_names can "
212 "only be called for distributed partitioned tables")));
213 }
214
215 EnsureTableOwner(relationId);
216
217 List *taskList = CreateFixPartitionShardIndexNamesTaskList(relationId);
218
219 /* do not do anything if there are no index names to fix */
220 if (taskList != NIL)
221 {
222 bool localExecutionSupported = true;
223 RowModifyLevel modLevel = ROW_MODIFY_NONE;
224 ExecutionParams *execParams = CreateBasicExecutionParams(modLevel, taskList,
225 MaxAdaptiveExecutorPoolSize,
226 localExecutionSupported);
227 ExecuteTaskListExtended(execParams);
228 }
229
230 relation_close(relation, NoLock);
231
232 PG_RETURN_VOID();
233 }
234
235
236 /*
237 * worker_fix_partition_shard_index_names fixes the index name of the index on given
238 * partition shard that has parent the given parent index.
239 * The parent index should be the index of a shard of a distributed partitioned table.
240 */
241 Datum
worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)242 worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)
243 {
244 Oid parentShardIndexId = PG_GETARG_OID(0);
245
246 text *partitionShardName = PG_GETARG_TEXT_P(1);
247
248 /* resolve partitionShardId from passed in schema and partition shard name */
249 List *partitionShardNameList = textToQualifiedNameList(partitionShardName);
250 RangeVar *partitionShard = makeRangeVarFromNameList(partitionShardNameList);
251
252 /* lock the relation with the lock mode */
253 bool missing_ok = true;
254 Oid partitionShardId = RangeVarGetRelid(partitionShard, NoLock, missing_ok);
255
256 if (!OidIsValid(partitionShardId))
257 {
258 PG_RETURN_VOID();
259 }
260
261 CheckCitusVersion(ERROR);
262 EnsureTableOwner(partitionShardId);
263
264 text *newPartitionShardIndexNameText = PG_GETARG_TEXT_P(2);
265 char *newPartitionShardIndexName = text_to_cstring(
266 newPartitionShardIndexNameText);
267
268 if (!has_subclass(parentShardIndexId))
269 {
270 ereport(ERROR, (errmsg("could not fix child index names: "
271 "index is not partitioned")));
272 }
273
274 List *partitionShardIndexIds = find_inheritance_children(parentShardIndexId,
275 ShareRowExclusiveLock);
276 Oid partitionShardIndexId = InvalidOid;
277 foreach_oid(partitionShardIndexId, partitionShardIndexIds)
278 {
279 if (IndexGetRelation(partitionShardIndexId, false) == partitionShardId)
280 {
281 char *partitionShardIndexName = get_rel_name(partitionShardIndexId);
282 if (ExtractShardIdFromTableName(partitionShardIndexName, missing_ok) ==
283 INVALID_SHARD_ID)
284 {
285 /*
286 * ExtractShardIdFromTableName will return INVALID_SHARD_ID if
287 * partitionShardIndexName doesn't end in _shardid. In that case,
288 * we want to rename this partition shard index to newPartitionShardIndexName,
289 * which ends in _shardid, hence we maintain naming consistency:
290 * we can reach this partition shard index by conventional Citus naming
291 */
292 RenameStmt *stmt = makeNode(RenameStmt);
293
294 stmt->renameType = OBJECT_INDEX;
295 stmt->missing_ok = false;
296 char *idxNamespace = get_namespace_name(get_rel_namespace(
297 partitionShardIndexId));
298 stmt->relation = makeRangeVar(idxNamespace, partitionShardIndexName, -1);
299 stmt->newname = newPartitionShardIndexName;
300
301 RenameRelation(stmt);
302 }
303 break;
304 }
305 }
306
307 PG_RETURN_VOID();
308 }
309
310
311 /*
312 * CreateFixPartitionConstraintsTaskList goes over all the partitions of a distributed
313 * partitioned table, and creates the list of tasks to execute
314 * worker_fix_pre_citus10_partitioned_table_constraint_names UDF on worker nodes.
315 */
316 static List *
CreateFixPartitionConstraintsTaskList(Oid relationId)317 CreateFixPartitionConstraintsTaskList(Oid relationId)
318 {
319 List *taskList = NIL;
320
321 /* enumerate the tasks when putting them to the taskList */
322 int taskId = 1;
323 List *checkConstraintList = CheckConstraintNameListForRelation(relationId);
324
325 /* early exit if the relation does not have any check constraints */
326 if (checkConstraintList == NIL)
327 {
328 return NIL;
329 }
330
331 List *shardIntervalList = LoadShardIntervalList(relationId);
332
333 /* lock metadata before getting placement lists */
334 LockShardListMetadata(shardIntervalList, ShareLock);
335
336 ShardInterval *shardInterval = NULL;
337 foreach_ptr(shardInterval, shardIntervalList)
338 {
339 uint64 shardId = shardInterval->shardId;
340
341 List *queryStringList = WorkerFixPartitionConstraintCommandList(relationId,
342 shardId,
343 checkConstraintList);
344
345 Task *task = CitusMakeNode(Task);
346 task->jobId = INVALID_JOB_ID;
347 task->taskId = taskId++;
348
349 task->taskType = DDL_TASK;
350 SetTaskQueryStringList(task, queryStringList);
351 task->dependentTaskList = NULL;
352 task->replicationModel = REPLICATION_MODEL_INVALID;
353 task->anchorShardId = shardId;
354 task->taskPlacementList = ActiveShardPlacementList(shardId);
355
356 taskList = lappend(taskList, task);
357 }
358
359 return taskList;
360 }
361
362
363 /*
364 * CheckConstraintNameListForRelation returns a list of names of CHECK constraints
365 * for a relation.
366 */
367 static List *
CheckConstraintNameListForRelation(Oid relationId)368 CheckConstraintNameListForRelation(Oid relationId)
369 {
370 List *constraintNameList = NIL;
371
372 int scanKeyCount = 2;
373 ScanKeyData scanKey[2];
374
375 Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
376
377 ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid,
378 BTEqualStrategyNumber, F_OIDEQ, relationId);
379 ScanKeyInit(&scanKey[1], Anum_pg_constraint_contype,
380 BTEqualStrategyNumber, F_CHAREQ, CONSTRAINT_CHECK);
381
382 bool useIndex = false;
383 SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, useIndex,
384 NULL, scanKeyCount, scanKey);
385
386 HeapTuple heapTuple = systable_getnext(scanDescriptor);
387 while (HeapTupleIsValid(heapTuple))
388 {
389 Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
390 char *constraintName = NameStr(constraintForm->conname);
391 constraintNameList = lappend(constraintNameList, pstrdup(constraintName));
392
393 heapTuple = systable_getnext(scanDescriptor);
394 }
395
396 systable_endscan(scanDescriptor);
397 table_close(pgConstraint, NoLock);
398
399 return constraintNameList;
400 }
401
402
403 /*
404 * WorkerFixPartitionConstraintCommandList creates a list of queries that will fix
405 * all check constraint names of a shard.
406 */
407 static List *
WorkerFixPartitionConstraintCommandList(Oid relationId,uint64 shardId,List * checkConstraintList)408 WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
409 List *checkConstraintList)
410 {
411 List *commandList = NIL;
412 Oid schemaId = get_rel_namespace(relationId);
413 char *schemaName = get_namespace_name(schemaId);
414 char *relationName = get_rel_name(relationId);
415 char *shardRelationName = pstrdup(relationName);
416
417 /* build shard relation name */
418 AppendShardIdToName(&shardRelationName, shardId);
419
420 char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
421
422 char *constraintName = NULL;
423 foreach_ptr(constraintName, checkConstraintList)
424 {
425 StringInfo shardQueryString = makeStringInfo();
426 appendStringInfo(shardQueryString,
427 "SELECT worker_fix_pre_citus10_partitioned_table_constraint_names(%s::regclass, "
428 UINT64_FORMAT ", %s::text)",
429 quote_literal_cstr(quotedShardName), shardId,
430 quote_literal_cstr(constraintName));
431 commandList = lappend(commandList, shardQueryString->data);
432 }
433
434 return commandList;
435 }
436
437
438 /*
439 * CreateFixPartitionShardIndexNamesTaskList goes over all the indexes of a distributed
440 * partitioned table, and creates the list of tasks to execute
441 * worker_fix_partition_shard_index_names UDF on worker nodes.
442 *
443 * We create parent_table_shard_count tasks,
444 * each task with parent_indexes_count x parent_partitions_count query strings.
445 */
446 static List *
CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId)447 CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId)
448 {
449 List *taskList = NIL;
450
451 /* enumerate the tasks when putting them to the taskList */
452 int taskId = 1;
453
454 Relation parentRelation = RelationIdGetRelation(parentRelationId);
455
456 List *parentIndexIdList = RelationGetIndexList(parentRelation);
457
458 /* early exit if the parent relation does not have any indexes */
459 if (parentIndexIdList == NIL)
460 {
461 RelationClose(parentRelation);
462 return NIL;
463 }
464
465 List *partitionList = PartitionList(parentRelationId);
466
467 /* early exit if the parent relation does not have any partitions */
468 if (partitionList == NIL)
469 {
470 RelationClose(parentRelation);
471 return NIL;
472 }
473
474 List *parentShardIntervalList = LoadShardIntervalList(parentRelationId);
475
476 /* lock metadata before getting placement lists */
477 LockShardListMetadata(parentShardIntervalList, ShareLock);
478 Oid partitionId = InvalidOid;
479 foreach_oid(partitionId, partitionList)
480 {
481 List *partitionShardIntervalList = LoadShardIntervalList(partitionId);
482 LockShardListMetadata(partitionShardIntervalList, ShareLock);
483 }
484
485 ShardInterval *parentShardInterval = NULL;
486 foreach_ptr(parentShardInterval, parentShardIntervalList)
487 {
488 uint64 parentShardId = parentShardInterval->shardId;
489
490 List *queryStringList = WorkerFixPartitionShardIndexNamesCommandList(
491 parentShardId, parentIndexIdList);
492
493 Task *task = CitusMakeNode(Task);
494 task->jobId = INVALID_JOB_ID;
495 task->taskId = taskId++;
496
497 task->taskType = DDL_TASK;
498 SetTaskQueryStringList(task, queryStringList);
499 task->dependentTaskList = NULL;
500 task->replicationModel = REPLICATION_MODEL_INVALID;
501 task->anchorShardId = parentShardId;
502 task->taskPlacementList = ActiveShardPlacementList(parentShardId);
503
504 taskList = lappend(taskList, task);
505 }
506
507 RelationClose(parentRelation);
508
509 return taskList;
510 }
511
512
513 /*
514 * WorkerFixPartitionShardIndexNamesCommandList creates a list of queries that will fix
515 * all child index names of parent indexes on given shard of parent partitioned table.
516 */
517 static List *
WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,List * parentIndexIdList)518 WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
519 List *parentIndexIdList)
520 {
521 List *commandList = NIL;
522 Oid parentIndexId = InvalidOid;
523 foreach_oid(parentIndexId, parentIndexIdList)
524 {
525 if (!has_subclass(parentIndexId))
526 {
527 continue;
528 }
529
530 /*
531 * Get the qualified name of the corresponding index of given parent index
532 * in the parent shard with given parentShardId
533 */
534 char *parentIndexName = get_rel_name(parentIndexId);
535 char *parentShardIndexName = pstrdup(parentIndexName);
536 AppendShardIdToName(&parentShardIndexName, parentShardId);
537 Oid schemaId = get_rel_namespace(parentIndexId);
538 char *schemaName = get_namespace_name(schemaId);
539 char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName,
540 parentShardIndexName);
541 List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
542 qualifiedParentShardIndexName, parentIndexId);
543 commandList = list_concat(commandList, commands);
544 }
545
546 return commandList;
547 }
548
549
550 /*
551 * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex creates a list of queries that will fix
552 * all child index names of given index on shard of parent partitioned table.
553 */
554 static List *
WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(char * qualifiedParentShardIndexName,Oid parentIndexId)555 WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
556 char *qualifiedParentShardIndexName, Oid parentIndexId)
557 {
558 List *commandList = NIL;
559
560 /*
561 * Get the list of all partition indexes that are children of current
562 * index on parent
563 */
564 List *partitionIndexIds = find_inheritance_children(parentIndexId,
565 ShareRowExclusiveLock);
566 Oid partitionIndexId = InvalidOid;
567 foreach_oid(partitionIndexId, partitionIndexIds)
568 {
569 List *commands = WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(
570 partitionIndexId, qualifiedParentShardIndexName);
571 commandList = list_concat(commandList, commands);
572 }
573 return commandList;
574 }
575
576
577 /*
578 * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex creates a list of queries that will fix
579 * all child index names of given index on shard of parent partitioned table, whose table relation is a shard
580 * of the partition that is the table relation of given partitionIndexId
581 */
582 static List *
WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId,char * qualifiedParentShardIndexName)583 WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId,
584 char *
585 qualifiedParentShardIndexName)
586 {
587 List *commandList = NIL;
588
589 /* get info for this partition relation of this index*/
590 char *partitionIndexName = get_rel_name(partitionIndexId);
591 Oid partitionId = IndexGetRelation(partitionIndexId, false);
592 char *partitionName = get_rel_name(partitionId);
593 char *partitionSchemaName = get_namespace_name(get_rel_namespace(partitionId));
594 List *partitionShardIntervalList = LoadShardIntervalList(partitionId);
595
596 ShardInterval *partitionShardInterval = NULL;
597 foreach_ptr(partitionShardInterval, partitionShardIntervalList)
598 {
599 /*
600 * Prepare commands for each shard of current partition
601 * to fix the index name that corresponds to the
602 * current parent index name
603 */
604 uint64 partitionShardId = partitionShardInterval->shardId;
605
606 /* get qualified partition shard name */
607 char *partitionShardName = pstrdup(partitionName);
608 AppendShardIdToName(&partitionShardName, partitionShardId);
609 char *qualifiedPartitionShardName = quote_qualified_identifier(
610 partitionSchemaName,
611 partitionShardName);
612
613 /* generate the new correct index name */
614 char *newPartitionShardIndexName = pstrdup(partitionIndexName);
615 AppendShardIdToName(&newPartitionShardIndexName, partitionShardId);
616
617 /* create worker_fix_partition_shard_index_names command */
618 StringInfo shardQueryString = makeStringInfo();
619 appendStringInfo(shardQueryString,
620 "SELECT worker_fix_partition_shard_index_names(%s::regclass, %s, %s)",
621 quote_literal_cstr(qualifiedParentShardIndexName),
622 quote_literal_cstr(qualifiedPartitionShardName),
623 quote_literal_cstr(newPartitionShardIndexName));
624 commandList = lappend(commandList, shardQueryString->data);
625 }
626
627 return commandList;
628 }
629
630
631 /*
632 * RelationHasConstraint checks if a relation has a constraint with a given name.
633 */
634 static bool
RelationHasConstraint(Oid relationId,char * constraintName)635 RelationHasConstraint(Oid relationId, char *constraintName)
636 {
637 bool found = false;
638
639 int scanKeyCount = 2;
640 ScanKeyData scanKey[2];
641
642 Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
643
644 ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid,
645 BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
646 ScanKeyInit(&scanKey[1], Anum_pg_constraint_conname,
647 BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(constraintName));
648
649 bool useIndex = false;
650 SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, useIndex,
651 NULL, scanKeyCount, scanKey);
652
653 HeapTuple heapTuple = systable_getnext(scanDescriptor);
654 if (HeapTupleIsValid(heapTuple))
655 {
656 found = true;
657 }
658
659 systable_endscan(scanDescriptor);
660 table_close(pgConstraint, NoLock);
661
662 return found;
663 }
664
665
666 /*
667 * RenameConstraintCommand creates the query string that will rename a constraint
668 */
669 static char *
RenameConstraintCommand(Oid relationId,char * constraintName,char * newConstraintName)670 RenameConstraintCommand(Oid relationId, char *constraintName, char *newConstraintName)
671 {
672 char *qualifiedRelationName = generate_qualified_relation_name(relationId);
673 const char *quotedConstraintName = quote_identifier(constraintName);
674 const char *quotedNewConstraintName = quote_identifier(newConstraintName);
675
676 StringInfo renameCommand = makeStringInfo();
677 appendStringInfo(renameCommand, "ALTER TABLE %s RENAME CONSTRAINT %s TO %s",
678 qualifiedRelationName, quotedConstraintName,
679 quotedNewConstraintName);
680
681 return renameCommand->data;
682 }
683
684
685 /*
686 * Returns true if the given relation is a partitioned table.
687 */
688 bool
PartitionedTable(Oid relationId)689 PartitionedTable(Oid relationId)
690 {
691 Relation rel = try_relation_open(relationId, AccessShareLock);
692
693 /* don't error out for tables that are dropped */
694 if (rel == NULL)
695 {
696 return false;
697 }
698
699 bool partitionedTable = false;
700
701 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
702 {
703 partitionedTable = true;
704 }
705
706 /* keep the lock */
707 table_close(rel, NoLock);
708
709 return partitionedTable;
710 }
711
712
713 /*
714 * Returns true if the given relation is a partitioned table. The function
715 * doesn't acquire any locks on the input relation, thus the caller is
716 * reponsible for holding the appropriate locks.
717 */
718 bool
PartitionedTableNoLock(Oid relationId)719 PartitionedTableNoLock(Oid relationId)
720 {
721 Relation rel = try_relation_open_nolock(relationId);
722 bool partitionedTable = false;
723
724 /* don't error out for tables that are dropped */
725 if (rel == NULL)
726 {
727 return false;
728 }
729
730 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
731 {
732 partitionedTable = true;
733 }
734
735 /* keep the lock */
736 table_close(rel, NoLock);
737
738 return partitionedTable;
739 }
740
741
742 /*
743 * Returns true if the given relation is a partition.
744 */
745 bool
PartitionTable(Oid relationId)746 PartitionTable(Oid relationId)
747 {
748 Relation rel = try_relation_open(relationId, AccessShareLock);
749
750 /* don't error out for tables that are dropped */
751 if (rel == NULL)
752 {
753 return false;
754 }
755
756 bool partitionTable = rel->rd_rel->relispartition;
757
758 /* keep the lock */
759 table_close(rel, NoLock);
760
761 return partitionTable;
762 }
763
764
765 /*
766 * Returns true if the given relation is a partition. The function
767 * doesn't acquire any locks on the input relation, thus the caller is
768 * reponsible for holding the appropriate locks.
769 */
770 bool
PartitionTableNoLock(Oid relationId)771 PartitionTableNoLock(Oid relationId)
772 {
773 Relation rel = try_relation_open_nolock(relationId);
774
775 /* don't error out for tables that are dropped */
776 if (rel == NULL)
777 {
778 return false;
779 }
780
781 bool partitionTable = rel->rd_rel->relispartition;
782
783 /* keep the lock */
784 table_close(rel, NoLock);
785
786 return partitionTable;
787 }
788
789
790 /*
791 * try_relation_open_nolock opens a relation with given relationId without
792 * acquiring locks. PostgreSQL's try_relation_open() asserts that caller
793 * has already acquired a lock on the relation, which we don't always do.
794 *
795 * ATTENTION:
796 * 1. Sync this with try_relation_open(). It hasn't changed for 10 to 12
797 * releases though.
798 * 2. We should remove this after we fix the locking/distributed deadlock
799 * issues with MX Truncate. See https://github.com/citusdata/citus/pull/2894
800 * for more discussion.
801 */
802 static Relation
try_relation_open_nolock(Oid relationId)803 try_relation_open_nolock(Oid relationId)
804 {
805 if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relationId)))
806 {
807 return NULL;
808 }
809
810 Relation relation = RelationIdGetRelation(relationId);
811 if (!RelationIsValid(relation))
812 {
813 return NULL;
814 }
815
816 pgstat_initstats(relation);
817
818 return relation;
819 }
820
821
822 /*
823 * IsChildTable returns true if the table is inherited. Note that
824 * partition tables inherites by default. However, this function
825 * returns false if the given table is a partition.
826 */
827 bool
IsChildTable(Oid relationId)828 IsChildTable(Oid relationId)
829 {
830 ScanKeyData key[1];
831 HeapTuple inheritsTuple = NULL;
832 bool tableInherits = false;
833
834 Relation pgInherits = table_open(InheritsRelationId, AccessShareLock);
835
836 ScanKeyInit(&key[0], Anum_pg_inherits_inhrelid,
837 BTEqualStrategyNumber, F_OIDEQ,
838 ObjectIdGetDatum(relationId));
839
840 SysScanDesc scan = systable_beginscan(pgInherits, InvalidOid, false,
841 NULL, 1, key);
842
843 while ((inheritsTuple = systable_getnext(scan)) != NULL)
844 {
845 Oid inheritedRelationId =
846 ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
847
848 if (relationId == inheritedRelationId)
849 {
850 tableInherits = true;
851 break;
852 }
853 }
854
855 systable_endscan(scan);
856 table_close(pgInherits, AccessShareLock);
857
858 if (tableInherits && PartitionTable(relationId))
859 {
860 tableInherits = false;
861 }
862
863 return tableInherits;
864 }
865
866
867 /*
868 * IsParentTable returns true if the table is inherited. Note that
869 * partitioned tables inherited by default. However, this function
870 * returns false if the given table is a partitioned table.
871 */
872 bool
IsParentTable(Oid relationId)873 IsParentTable(Oid relationId)
874 {
875 ScanKeyData key[1];
876 bool tableInherited = false;
877
878 Relation pgInherits = table_open(InheritsRelationId, AccessShareLock);
879
880 ScanKeyInit(&key[0], Anum_pg_inherits_inhparent,
881 BTEqualStrategyNumber, F_OIDEQ,
882 ObjectIdGetDatum(relationId));
883
884 SysScanDesc scan = systable_beginscan(pgInherits, InheritsParentIndexId, true,
885 NULL, 1, key);
886
887 if (systable_getnext(scan) != NULL)
888 {
889 tableInherited = true;
890 }
891 systable_endscan(scan);
892 table_close(pgInherits, AccessShareLock);
893
894 if (tableInherited && PartitionedTable(relationId))
895 {
896 tableInherited = false;
897 }
898
899 return tableInherited;
900 }
901
902
903 /*
904 * Wrapper around get_partition_parent
905 *
906 * Note: Because this function assumes that the relation whose OID is passed
907 * as an argument will have precisely one parent, it should only be called
908 * when it is known that the relation is a partition.
909 */
910 Oid
PartitionParentOid(Oid partitionOid)911 PartitionParentOid(Oid partitionOid)
912 {
913 Oid partitionParentOid = get_partition_parent_compat(partitionOid, false);
914
915 return partitionParentOid;
916 }
917
918
919 /*
920 * PartitionWithLongestNameRelationId is a utility function that returns the
921 * oid of the partition table that has the longest name in terms of number of
922 * characters.
923 */
924 Oid
PartitionWithLongestNameRelationId(Oid parentRelationId)925 PartitionWithLongestNameRelationId(Oid parentRelationId)
926 {
927 Oid longestNamePartitionId = InvalidOid;
928 int longestNameLength = 0;
929 List *partitionList = PartitionList(parentRelationId);
930
931 Oid partitionRelationId = InvalidOid;
932 foreach_oid(partitionRelationId, partitionList)
933 {
934 char *partitionName = get_rel_name(partitionRelationId);
935 int partitionNameLength = strnlen(partitionName, NAMEDATALEN);
936 if (partitionNameLength > longestNameLength)
937 {
938 longestNamePartitionId = partitionRelationId;
939 longestNameLength = partitionNameLength;
940 }
941 }
942
943 return longestNamePartitionId;
944 }
945
946
947 /*
948 * Takes a parent relation and returns Oid list of its partitions. The
949 * function errors out if the given relation is not a parent.
950 */
951 List *
PartitionList(Oid parentRelationId)952 PartitionList(Oid parentRelationId)
953 {
954 Relation rel = table_open(parentRelationId, AccessShareLock);
955 List *partitionList = NIL;
956
957
958 if (!PartitionedTable(parentRelationId))
959 {
960 char *relationName = get_rel_name(parentRelationId);
961
962 ereport(ERROR, (errmsg("\"%s\" is not a parent table", relationName)));
963 }
964 PartitionDesc partDesc = RelationGetPartitionDesc_compat(rel, true);
965 Assert(partDesc != NULL);
966
967 int partitionCount = partDesc->nparts;
968 for (int partitionIndex = 0; partitionIndex < partitionCount; ++partitionIndex)
969 {
970 partitionList =
971 lappend_oid(partitionList, partDesc->oids[partitionIndex]);
972 }
973
974 /* keep the lock */
975 table_close(rel, NoLock);
976
977 return partitionList;
978 }
979
980
981 /*
982 * GenerateDetachPartitionCommand gets a partition table and returns
983 * "ALTER TABLE parent_table DETACH PARTITION partitionName" command.
984 */
985 char *
GenerateDetachPartitionCommand(Oid partitionTableId)986 GenerateDetachPartitionCommand(Oid partitionTableId)
987 {
988 StringInfo detachPartitionCommand = makeStringInfo();
989
990 if (!PartitionTable(partitionTableId))
991 {
992 char *relationName = get_rel_name(partitionTableId);
993
994 ereport(ERROR, (errmsg("\"%s\" is not a partition", relationName)));
995 }
996
997 Oid parentId = get_partition_parent_compat(partitionTableId, false);
998 char *tableQualifiedName = generate_qualified_relation_name(partitionTableId);
999 char *parentTableQualifiedName = generate_qualified_relation_name(parentId);
1000
1001 appendStringInfo(detachPartitionCommand,
1002 "ALTER TABLE IF EXISTS %s DETACH PARTITION %s;",
1003 parentTableQualifiedName, tableQualifiedName);
1004
1005 return detachPartitionCommand->data;
1006 }
1007
1008
1009 /*
1010 * GenereatePartitioningInformation returns the partitioning type and partition column
1011 * for the given parent table in the form of "PARTITION TYPE (partitioning column(s)/expression(s))".
1012 */
1013 char *
GeneratePartitioningInformation(Oid parentTableId)1014 GeneratePartitioningInformation(Oid parentTableId)
1015 {
1016 char *partitionBoundCString = "";
1017
1018 if (!PartitionedTable(parentTableId))
1019 {
1020 char *relationName = get_rel_name(parentTableId);
1021
1022 ereport(ERROR, (errmsg("\"%s\" is not a parent table", relationName)));
1023 }
1024
1025 Datum partitionBoundDatum = DirectFunctionCall1(pg_get_partkeydef,
1026 ObjectIdGetDatum(parentTableId));
1027
1028 partitionBoundCString = TextDatumGetCString(partitionBoundDatum);
1029
1030 return partitionBoundCString;
1031 }
1032
1033
1034 /*
1035 * GenerateAttachShardPartitionCommand generates command to attach a child table
1036 * table to its parent in a partitioning hierarchy.
1037 */
1038 char *
GenerateAttachShardPartitionCommand(ShardInterval * shardInterval)1039 GenerateAttachShardPartitionCommand(ShardInterval *shardInterval)
1040 {
1041 Oid schemaId = get_rel_namespace(shardInterval->relationId);
1042 char *schemaName = get_namespace_name(schemaId);
1043 char *escapedSchemaName = quote_literal_cstr(schemaName);
1044
1045 char *command = GenerateAlterTableAttachPartitionCommand(shardInterval->relationId);
1046 char *escapedCommand = quote_literal_cstr(command);
1047 int shardIndex = ShardIndex(shardInterval);
1048
1049
1050 StringInfo attachPartitionCommand = makeStringInfo();
1051
1052 Oid parentRelationId = PartitionParentOid(shardInterval->relationId);
1053 if (parentRelationId == InvalidOid)
1054 {
1055 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1056 errmsg("cannot attach partition"),
1057 errdetail("Referenced relation cannot be found.")));
1058 }
1059
1060 Oid parentSchemaId = get_rel_namespace(parentRelationId);
1061 char *parentSchemaName = get_namespace_name(parentSchemaId);
1062 char *escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
1063 uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex);
1064
1065 appendStringInfo(attachPartitionCommand,
1066 WORKER_APPLY_INTER_SHARD_DDL_COMMAND, parentShardId,
1067 escapedParentSchemaName, shardInterval->shardId,
1068 escapedSchemaName, escapedCommand);
1069
1070 return attachPartitionCommand->data;
1071 }
1072
1073
1074 /*
1075 * GenerateAlterTableAttachPartitionCommand returns the necessary command to
1076 * attach the given partition to its parent.
1077 */
1078 char *
GenerateAlterTableAttachPartitionCommand(Oid partitionTableId)1079 GenerateAlterTableAttachPartitionCommand(Oid partitionTableId)
1080 {
1081 StringInfo createPartitionCommand = makeStringInfo();
1082
1083
1084 if (!PartitionTable(partitionTableId))
1085 {
1086 char *relationName = get_rel_name(partitionTableId);
1087
1088 ereport(ERROR, (errmsg("\"%s\" is not a partition", relationName)));
1089 }
1090
1091 Oid parentId = get_partition_parent_compat(partitionTableId, false);
1092 char *tableQualifiedName = generate_qualified_relation_name(partitionTableId);
1093 char *parentTableQualifiedName = generate_qualified_relation_name(parentId);
1094
1095 char *partitionBoundCString = PartitionBound(partitionTableId);
1096
1097 appendStringInfo(createPartitionCommand, "ALTER TABLE %s ATTACH PARTITION %s %s;",
1098 parentTableQualifiedName, tableQualifiedName,
1099 partitionBoundCString);
1100
1101 return createPartitionCommand->data;
1102 }
1103
1104
1105 /*
1106 * This function heaviliy inspired from RelationBuildPartitionDesc()
1107 * which is avaliable in src/backend/catalog/partition.c.
1108 *
1109 * The function simply reads the pg_class and gets the partition bound.
1110 * Later, converts it to text format and returns.
1111 */
1112 static char *
PartitionBound(Oid partitionId)1113 PartitionBound(Oid partitionId)
1114 {
1115 bool isnull = false;
1116
1117 HeapTuple tuple = SearchSysCache1(RELOID, partitionId);
1118 if (!HeapTupleIsValid(tuple))
1119 {
1120 elog(ERROR, "cache lookup failed for relation %u", partitionId);
1121 }
1122
1123 /*
1124 * It is possible that the pg_class tuple of a partition has not been
1125 * updated yet to set its relpartbound field. The only case where
1126 * this happens is when we open the parent relation to check using its
1127 * partition descriptor that a new partition's bound does not overlap
1128 * some existing partition.
1129 */
1130 if (!((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1131 {
1132 ReleaseSysCache(tuple);
1133 return "";
1134 }
1135
1136 Datum datum = SysCacheGetAttr(RELOID, tuple,
1137 Anum_pg_class_relpartbound,
1138 &isnull);
1139 Assert(!isnull);
1140
1141 Datum partitionBoundDatum =
1142 DirectFunctionCall2(pg_get_expr, datum, ObjectIdGetDatum(partitionId));
1143
1144 char *partitionBoundString = TextDatumGetCString(partitionBoundDatum);
1145
1146 ReleaseSysCache(tuple);
1147
1148 return partitionBoundString;
1149 }
1150