1 /*
2  * multi_partitioning_utils.c
3  *	  Utility functions for declarative partitioning
4  *
5  * Copyright (c) Citus Data, Inc.
6  */
7 #include "postgres.h"
8 
9 #include "distributed/pg_version_constants.h"
10 
11 #include "access/genam.h"
12 #include "access/heapam.h"
13 #include "access/htup_details.h"
14 #include "catalog/index.h"
15 #include "catalog/indexing.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_constraint.h"
19 #include "catalog/pg_inherits.h"
20 #include "commands/tablecmds.h"
21 #include "common/string.h"
22 #include "distributed/citus_nodes.h"
23 #include "distributed/adaptive_executor.h"
24 #include "distributed/citus_ruleutils.h"
25 #include "distributed/colocation_utils.h"
26 #include "distributed/commands.h"
27 #include "distributed/coordinator_protocol.h"
28 #include "distributed/deparse_shard_query.h"
29 #include "distributed/listutils.h"
30 #include "distributed/metadata_utility.h"
31 #include "distributed/multi_executor.h"
32 #include "distributed/multi_partitioning_utils.h"
33 #include "distributed/multi_physical_planner.h"
34 #include "distributed/relay_utility.h"
35 #include "distributed/resource_lock.h"
36 #include "distributed/shardinterval_utils.h"
37 #include "distributed/version_compat.h"
38 #include "distributed/worker_protocol.h"
39 #include "lib/stringinfo.h"
40 #include "nodes/makefuncs.h"
41 #include "nodes/pg_list.h"
42 #include "pgstat.h"
43 #include "partitioning/partdesc.h"
44 #include "utils/builtins.h"
45 #include "utils/fmgroids.h"
46 #include "utils/lsyscache.h"
47 #include "utils/rel.h"
48 #include "utils/syscache.h"
49 #include "utils/varlena.h"
50 
51 static char * PartitionBound(Oid partitionId);
52 static Relation try_relation_open_nolock(Oid relationId);
53 static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
54 static List * WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
55 													  List *checkConstraintList);
56 static List * CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId);
57 static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
58 														   List *indexIdList);
59 static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
60 	char *qualifiedParentShardIndexName, Oid parentIndexId);
61 static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid
62 																			partitionIndexId,
63 																			char *
64 																			qualifiedParentShardIndexName);
65 static List * CheckConstraintNameListForRelation(Oid relationId);
66 static bool RelationHasConstraint(Oid relationId, char *constraintName);
67 static char * RenameConstraintCommand(Oid relationId, char *constraintName,
68 									  char *newConstraintName);
69 
70 
71 PG_FUNCTION_INFO_V1(fix_pre_citus10_partitioned_table_constraint_names);
72 PG_FUNCTION_INFO_V1(worker_fix_pre_citus10_partitioned_table_constraint_names);
73 PG_FUNCTION_INFO_V1(fix_partition_shard_index_names);
74 PG_FUNCTION_INFO_V1(worker_fix_partition_shard_index_names);
75 
76 
77 /*
78  * fix_pre_citus10_partitioned_table_constraint_names fixes the constraint names of
79  * partitioned table shards on workers.
80  *
81  * Constraint names for partitioned table shards should have shardId suffixes if and only
82  * if they are unique or foreign key constraints. We mistakenly appended shardIds to
83  * constraint names on ALTER TABLE dist_part_table ADD CONSTRAINT .. queries prior to
84  * Citus 10. fix_pre_citus10_partitioned_table_constraint_names determines if this is the
85  * case, and renames constraints back to their original names on shards.
86  */
87 Datum
fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)88 fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)
89 {
90 	Oid relationId = PG_GETARG_OID(0);
91 	EnsureCoordinator();
92 
93 	if (!PartitionedTable(relationId))
94 	{
95 		ereport(ERROR, (errmsg("could not fix partition constraints: "
96 							   "relation does not exist or is not partitioned")));
97 	}
98 	if (!IsCitusTable(relationId))
99 	{
100 		ereport(ERROR, (errmsg("fix_pre_citus10_partitioned_table_constraint_names can "
101 							   "only be called for distributed partitioned tables")));
102 	}
103 
104 	List *taskList = CreateFixPartitionConstraintsTaskList(relationId);
105 
106 	/* do not do anything if there are no constraints that should be fixed */
107 	if (taskList != NIL)
108 	{
109 		bool localExecutionSupported = true;
110 		ExecuteUtilityTaskList(taskList, localExecutionSupported);
111 	}
112 
113 	PG_RETURN_VOID();
114 }
115 
116 
117 /*
118  * worker_fix_pre_citus10_partitioned_table_constraint_names fixes the constraint names on a worker given a shell
119  * table name and shard id.
120  */
121 Datum
worker_fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)122 worker_fix_pre_citus10_partitioned_table_constraint_names(PG_FUNCTION_ARGS)
123 {
124 	Oid relationId = PG_GETARG_OID(0);
125 	int64 shardId = PG_GETARG_INT32(1);
126 	text *constraintNameText = PG_GETARG_TEXT_P(2);
127 
128 	if (!PartitionedTable(relationId))
129 	{
130 		ereport(ERROR, (errmsg("could not fix partition constraints: "
131 							   "relation does not exist or is not partitioned")));
132 	}
133 
134 	char *constraintName = text_to_cstring(constraintNameText);
135 	char *shardIdAppendedConstraintName = pstrdup(constraintName);
136 	AppendShardIdToName(&shardIdAppendedConstraintName, shardId);
137 
138 	/* if shardId was appended to the constraint name, rename back to original */
139 	if (RelationHasConstraint(relationId, shardIdAppendedConstraintName))
140 	{
141 		char *renameConstraintDDLCommand =
142 			RenameConstraintCommand(relationId, shardIdAppendedConstraintName,
143 									constraintName);
144 		ExecuteAndLogUtilityCommand(renameConstraintDDLCommand);
145 	}
146 	PG_RETURN_VOID();
147 }
148 
149 
150 /*
151  * fix_partition_shard_index_names fixes the index names of shards of partitions of
152  * partitioned tables on workers.
153  *
154  * When running CREATE INDEX on parent_table, we didn't explicitly create the index on
155  * each partition as well. Postgres created indexes for partitions in the coordinator,
156  * and also in the workers. Actually, Postgres auto-generates index names when auto-creating
157  * indexes on each partition shard of the parent shards. If index name is too long, it
158  * truncates the name and adds _idx postfix to it. However, when truncating the name, the
159  * shardId of the partition shard can be lost. This may result in the same index name used for
160  * the partition shell table and one of the partition shards.
161  * For more details, check issue #4962 https://github.com/citusdata/citus/issues/4962
162  *
163  * fix_partition_shard_index_names renames indexes of shards of partition tables to include
164  * the shardId at the end of the name, regardless of whether index name was long or short
165  * As a result there will be no index name ending in _idx, rather all will end in _{shardid}
166  * Algorithm is:
167  * foreach parentShard in shardListOfParentTableId:
168  *  foreach parentIndex on parent:
169  *      generate qualifiedParentShardIndexName -> parentShardIndex
170  *      foreach inheritedPartitionIndex on parentIndex:
171  *          get table relation of inheritedPartitionIndex -> partitionId
172  *          foreach partitionShard in shardListOfPartitionid:
173  *              generate qualifiedPartitionShardName -> partitionShard
174  *              generate newPartitionShardIndexName
175  *              (the following happens in the worker node)
176  *              foreach inheritedPartitionShardIndex on parentShardIndex:
177  *                  if table relation of inheritedPartitionShardIndex is partitionShard:
178  *                      if inheritedPartitionShardIndex does not have proper name:
179  *                          Rename(inheritedPartitionShardIndex, newPartitionShardIndexName)
180  *                      break
181  */
182 Datum
fix_partition_shard_index_names(PG_FUNCTION_ARGS)183 fix_partition_shard_index_names(PG_FUNCTION_ARGS)
184 {
185 	CheckCitusVersion(ERROR);
186 	EnsureCoordinator();
187 
188 	Oid relationId = PG_GETARG_OID(0);
189 
190 	Relation relation = try_relation_open(relationId, AccessExclusiveLock);
191 
192 	if (relation == NULL)
193 	{
194 		ereport(NOTICE, (errmsg("relation with OID %u does not exist, skipping",
195 								relationId)));
196 		PG_RETURN_VOID();
197 	}
198 
199 	if (relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
200 	{
201 		relation_close(relation, NoLock);
202 		ereport(ERROR, (errmsg(
203 							"Fixing shard index names is only applicable to partitioned"
204 							" tables, and \"%s\" is not a partitioned table",
205 							RelationGetRelationName(relation))));
206 	}
207 
208 	if (!IsCitusTable(relationId))
209 	{
210 		relation_close(relation, NoLock);
211 		ereport(ERROR, (errmsg("fix_partition_shard_index_names can "
212 							   "only be called for distributed partitioned tables")));
213 	}
214 
215 	EnsureTableOwner(relationId);
216 
217 	List *taskList = CreateFixPartitionShardIndexNamesTaskList(relationId);
218 
219 	/* do not do anything if there are no index names to fix */
220 	if (taskList != NIL)
221 	{
222 		bool localExecutionSupported = true;
223 		RowModifyLevel modLevel = ROW_MODIFY_NONE;
224 		ExecutionParams *execParams = CreateBasicExecutionParams(modLevel, taskList,
225 																 MaxAdaptiveExecutorPoolSize,
226 																 localExecutionSupported);
227 		ExecuteTaskListExtended(execParams);
228 	}
229 
230 	relation_close(relation, NoLock);
231 
232 	PG_RETURN_VOID();
233 }
234 
235 
236 /*
237  * worker_fix_partition_shard_index_names fixes the index name of the index on given
238  * partition shard that has parent the given parent index.
239  * The parent index should be the index of a shard of a distributed partitioned table.
240  */
241 Datum
worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)242 worker_fix_partition_shard_index_names(PG_FUNCTION_ARGS)
243 {
244 	Oid parentShardIndexId = PG_GETARG_OID(0);
245 
246 	text *partitionShardName = PG_GETARG_TEXT_P(1);
247 
248 	/* resolve partitionShardId from passed in schema and partition shard name */
249 	List *partitionShardNameList = textToQualifiedNameList(partitionShardName);
250 	RangeVar *partitionShard = makeRangeVarFromNameList(partitionShardNameList);
251 
252 	/* lock the relation with the lock mode */
253 	bool missing_ok = true;
254 	Oid partitionShardId = RangeVarGetRelid(partitionShard, NoLock, missing_ok);
255 
256 	if (!OidIsValid(partitionShardId))
257 	{
258 		PG_RETURN_VOID();
259 	}
260 
261 	CheckCitusVersion(ERROR);
262 	EnsureTableOwner(partitionShardId);
263 
264 	text *newPartitionShardIndexNameText = PG_GETARG_TEXT_P(2);
265 	char *newPartitionShardIndexName = text_to_cstring(
266 		newPartitionShardIndexNameText);
267 
268 	if (!has_subclass(parentShardIndexId))
269 	{
270 		ereport(ERROR, (errmsg("could not fix child index names: "
271 							   "index is not partitioned")));
272 	}
273 
274 	List *partitionShardIndexIds = find_inheritance_children(parentShardIndexId,
275 															 ShareRowExclusiveLock);
276 	Oid partitionShardIndexId = InvalidOid;
277 	foreach_oid(partitionShardIndexId, partitionShardIndexIds)
278 	{
279 		if (IndexGetRelation(partitionShardIndexId, false) == partitionShardId)
280 		{
281 			char *partitionShardIndexName = get_rel_name(partitionShardIndexId);
282 			if (ExtractShardIdFromTableName(partitionShardIndexName, missing_ok) ==
283 				INVALID_SHARD_ID)
284 			{
285 				/*
286 				 * ExtractShardIdFromTableName will return INVALID_SHARD_ID if
287 				 * partitionShardIndexName doesn't end in _shardid. In that case,
288 				 * we want to rename this partition shard index to newPartitionShardIndexName,
289 				 * which ends in _shardid, hence we maintain naming consistency:
290 				 * we can reach this partition shard index by conventional Citus naming
291 				 */
292 				RenameStmt *stmt = makeNode(RenameStmt);
293 
294 				stmt->renameType = OBJECT_INDEX;
295 				stmt->missing_ok = false;
296 				char *idxNamespace = get_namespace_name(get_rel_namespace(
297 															partitionShardIndexId));
298 				stmt->relation = makeRangeVar(idxNamespace, partitionShardIndexName, -1);
299 				stmt->newname = newPartitionShardIndexName;
300 
301 				RenameRelation(stmt);
302 			}
303 			break;
304 		}
305 	}
306 
307 	PG_RETURN_VOID();
308 }
309 
310 
311 /*
312  * CreateFixPartitionConstraintsTaskList goes over all the partitions of a distributed
313  * partitioned table, and creates the list of tasks to execute
314  * worker_fix_pre_citus10_partitioned_table_constraint_names UDF on worker nodes.
315  */
316 static List *
CreateFixPartitionConstraintsTaskList(Oid relationId)317 CreateFixPartitionConstraintsTaskList(Oid relationId)
318 {
319 	List *taskList = NIL;
320 
321 	/* enumerate the tasks when putting them to the taskList */
322 	int taskId = 1;
323 	List *checkConstraintList = CheckConstraintNameListForRelation(relationId);
324 
325 	/* early exit if the relation does not have any check constraints */
326 	if (checkConstraintList == NIL)
327 	{
328 		return NIL;
329 	}
330 
331 	List *shardIntervalList = LoadShardIntervalList(relationId);
332 
333 	/* lock metadata before getting placement lists */
334 	LockShardListMetadata(shardIntervalList, ShareLock);
335 
336 	ShardInterval *shardInterval = NULL;
337 	foreach_ptr(shardInterval, shardIntervalList)
338 	{
339 		uint64 shardId = shardInterval->shardId;
340 
341 		List *queryStringList = WorkerFixPartitionConstraintCommandList(relationId,
342 																		shardId,
343 																		checkConstraintList);
344 
345 		Task *task = CitusMakeNode(Task);
346 		task->jobId = INVALID_JOB_ID;
347 		task->taskId = taskId++;
348 
349 		task->taskType = DDL_TASK;
350 		SetTaskQueryStringList(task, queryStringList);
351 		task->dependentTaskList = NULL;
352 		task->replicationModel = REPLICATION_MODEL_INVALID;
353 		task->anchorShardId = shardId;
354 		task->taskPlacementList = ActiveShardPlacementList(shardId);
355 
356 		taskList = lappend(taskList, task);
357 	}
358 
359 	return taskList;
360 }
361 
362 
363 /*
364  * CheckConstraintNameListForRelation returns a list of names of CHECK constraints
365  * for a relation.
366  */
367 static List *
CheckConstraintNameListForRelation(Oid relationId)368 CheckConstraintNameListForRelation(Oid relationId)
369 {
370 	List *constraintNameList = NIL;
371 
372 	int scanKeyCount = 2;
373 	ScanKeyData scanKey[2];
374 
375 	Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
376 
377 	ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid,
378 				BTEqualStrategyNumber, F_OIDEQ, relationId);
379 	ScanKeyInit(&scanKey[1], Anum_pg_constraint_contype,
380 				BTEqualStrategyNumber, F_CHAREQ, CONSTRAINT_CHECK);
381 
382 	bool useIndex = false;
383 	SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, useIndex,
384 													NULL, scanKeyCount, scanKey);
385 
386 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
387 	while (HeapTupleIsValid(heapTuple))
388 	{
389 		Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
390 		char *constraintName = NameStr(constraintForm->conname);
391 		constraintNameList = lappend(constraintNameList, pstrdup(constraintName));
392 
393 		heapTuple = systable_getnext(scanDescriptor);
394 	}
395 
396 	systable_endscan(scanDescriptor);
397 	table_close(pgConstraint, NoLock);
398 
399 	return constraintNameList;
400 }
401 
402 
403 /*
404  * WorkerFixPartitionConstraintCommandList creates a list of queries that will fix
405  * all check constraint names of a shard.
406  */
407 static List *
WorkerFixPartitionConstraintCommandList(Oid relationId,uint64 shardId,List * checkConstraintList)408 WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
409 										List *checkConstraintList)
410 {
411 	List *commandList = NIL;
412 	Oid schemaId = get_rel_namespace(relationId);
413 	char *schemaName = get_namespace_name(schemaId);
414 	char *relationName = get_rel_name(relationId);
415 	char *shardRelationName = pstrdup(relationName);
416 
417 	/* build shard relation name */
418 	AppendShardIdToName(&shardRelationName, shardId);
419 
420 	char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
421 
422 	char *constraintName = NULL;
423 	foreach_ptr(constraintName, checkConstraintList)
424 	{
425 		StringInfo shardQueryString = makeStringInfo();
426 		appendStringInfo(shardQueryString,
427 						 "SELECT worker_fix_pre_citus10_partitioned_table_constraint_names(%s::regclass, "
428 						 UINT64_FORMAT ", %s::text)",
429 						 quote_literal_cstr(quotedShardName), shardId,
430 						 quote_literal_cstr(constraintName));
431 		commandList = lappend(commandList, shardQueryString->data);
432 	}
433 
434 	return commandList;
435 }
436 
437 
438 /*
439  * CreateFixPartitionShardIndexNamesTaskList goes over all the indexes of a distributed
440  * partitioned table, and creates the list of tasks to execute
441  * worker_fix_partition_shard_index_names UDF on worker nodes.
442  *
443  * We create parent_table_shard_count tasks,
444  * each task with parent_indexes_count x parent_partitions_count query strings.
445  */
446 static List *
CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId)447 CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId)
448 {
449 	List *taskList = NIL;
450 
451 	/* enumerate the tasks when putting them to the taskList */
452 	int taskId = 1;
453 
454 	Relation parentRelation = RelationIdGetRelation(parentRelationId);
455 
456 	List *parentIndexIdList = RelationGetIndexList(parentRelation);
457 
458 	/* early exit if the parent relation does not have any indexes */
459 	if (parentIndexIdList == NIL)
460 	{
461 		RelationClose(parentRelation);
462 		return NIL;
463 	}
464 
465 	List *partitionList = PartitionList(parentRelationId);
466 
467 	/* early exit if the parent relation does not have any partitions */
468 	if (partitionList == NIL)
469 	{
470 		RelationClose(parentRelation);
471 		return NIL;
472 	}
473 
474 	List *parentShardIntervalList = LoadShardIntervalList(parentRelationId);
475 
476 	/* lock metadata before getting placement lists */
477 	LockShardListMetadata(parentShardIntervalList, ShareLock);
478 	Oid partitionId = InvalidOid;
479 	foreach_oid(partitionId, partitionList)
480 	{
481 		List *partitionShardIntervalList = LoadShardIntervalList(partitionId);
482 		LockShardListMetadata(partitionShardIntervalList, ShareLock);
483 	}
484 
485 	ShardInterval *parentShardInterval = NULL;
486 	foreach_ptr(parentShardInterval, parentShardIntervalList)
487 	{
488 		uint64 parentShardId = parentShardInterval->shardId;
489 
490 		List *queryStringList = WorkerFixPartitionShardIndexNamesCommandList(
491 			parentShardId, parentIndexIdList);
492 
493 		Task *task = CitusMakeNode(Task);
494 		task->jobId = INVALID_JOB_ID;
495 		task->taskId = taskId++;
496 
497 		task->taskType = DDL_TASK;
498 		SetTaskQueryStringList(task, queryStringList);
499 		task->dependentTaskList = NULL;
500 		task->replicationModel = REPLICATION_MODEL_INVALID;
501 		task->anchorShardId = parentShardId;
502 		task->taskPlacementList = ActiveShardPlacementList(parentShardId);
503 
504 		taskList = lappend(taskList, task);
505 	}
506 
507 	RelationClose(parentRelation);
508 
509 	return taskList;
510 }
511 
512 
513 /*
514  * WorkerFixPartitionShardIndexNamesCommandList creates a list of queries that will fix
515  * all child index names of parent indexes on given shard of parent partitioned table.
516  */
517 static List *
WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,List * parentIndexIdList)518 WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
519 											 List *parentIndexIdList)
520 {
521 	List *commandList = NIL;
522 	Oid parentIndexId = InvalidOid;
523 	foreach_oid(parentIndexId, parentIndexIdList)
524 	{
525 		if (!has_subclass(parentIndexId))
526 		{
527 			continue;
528 		}
529 
530 		/*
531 		 * Get the qualified name of the corresponding index of given parent index
532 		 * in the parent shard with given parentShardId
533 		 */
534 		char *parentIndexName = get_rel_name(parentIndexId);
535 		char *parentShardIndexName = pstrdup(parentIndexName);
536 		AppendShardIdToName(&parentShardIndexName, parentShardId);
537 		Oid schemaId = get_rel_namespace(parentIndexId);
538 		char *schemaName = get_namespace_name(schemaId);
539 		char *qualifiedParentShardIndexName = quote_qualified_identifier(schemaName,
540 																		 parentShardIndexName);
541 		List *commands = WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
542 			qualifiedParentShardIndexName, parentIndexId);
543 		commandList = list_concat(commandList, commands);
544 	}
545 
546 	return commandList;
547 }
548 
549 
550 /*
551  * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex creates a list of queries that will fix
552  * all child index names of given index on shard of parent partitioned table.
553  */
554 static List *
WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(char * qualifiedParentShardIndexName,Oid parentIndexId)555 WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
556 	char *qualifiedParentShardIndexName, Oid parentIndexId)
557 {
558 	List *commandList = NIL;
559 
560 	/*
561 	 * Get the list of all partition indexes that are children of current
562 	 * index on parent
563 	 */
564 	List *partitionIndexIds = find_inheritance_children(parentIndexId,
565 														ShareRowExclusiveLock);
566 	Oid partitionIndexId = InvalidOid;
567 	foreach_oid(partitionIndexId, partitionIndexIds)
568 	{
569 		List *commands = WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(
570 			partitionIndexId, qualifiedParentShardIndexName);
571 		commandList = list_concat(commandList, commands);
572 	}
573 	return commandList;
574 }
575 
576 
577 /*
578  * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex creates a list of queries that will fix
579  * all child index names of given index on shard of parent partitioned table, whose table relation is a shard
580  * of the partition that is the table relation of given partitionIndexId
581  */
582 static List *
WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId,char * qualifiedParentShardIndexName)583 WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid partitionIndexId,
584 															  char *
585 															  qualifiedParentShardIndexName)
586 {
587 	List *commandList = NIL;
588 
589 	/* get info for this partition relation of this index*/
590 	char *partitionIndexName = get_rel_name(partitionIndexId);
591 	Oid partitionId = IndexGetRelation(partitionIndexId, false);
592 	char *partitionName = get_rel_name(partitionId);
593 	char *partitionSchemaName = get_namespace_name(get_rel_namespace(partitionId));
594 	List *partitionShardIntervalList = LoadShardIntervalList(partitionId);
595 
596 	ShardInterval *partitionShardInterval = NULL;
597 	foreach_ptr(partitionShardInterval, partitionShardIntervalList)
598 	{
599 		/*
600 		 * Prepare commands for each shard of current partition
601 		 * to fix the index name that corresponds to the
602 		 * current parent index name
603 		 */
604 		uint64 partitionShardId = partitionShardInterval->shardId;
605 
606 		/* get qualified partition shard name */
607 		char *partitionShardName = pstrdup(partitionName);
608 		AppendShardIdToName(&partitionShardName, partitionShardId);
609 		char *qualifiedPartitionShardName = quote_qualified_identifier(
610 			partitionSchemaName,
611 			partitionShardName);
612 
613 		/* generate the new correct index name */
614 		char *newPartitionShardIndexName = pstrdup(partitionIndexName);
615 		AppendShardIdToName(&newPartitionShardIndexName, partitionShardId);
616 
617 		/* create worker_fix_partition_shard_index_names command */
618 		StringInfo shardQueryString = makeStringInfo();
619 		appendStringInfo(shardQueryString,
620 						 "SELECT worker_fix_partition_shard_index_names(%s::regclass, %s, %s)",
621 						 quote_literal_cstr(qualifiedParentShardIndexName),
622 						 quote_literal_cstr(qualifiedPartitionShardName),
623 						 quote_literal_cstr(newPartitionShardIndexName));
624 		commandList = lappend(commandList, shardQueryString->data);
625 	}
626 
627 	return commandList;
628 }
629 
630 
631 /*
632  * RelationHasConstraint checks if a relation has a constraint with a given name.
633  */
634 static bool
RelationHasConstraint(Oid relationId,char * constraintName)635 RelationHasConstraint(Oid relationId, char *constraintName)
636 {
637 	bool found = false;
638 
639 	int scanKeyCount = 2;
640 	ScanKeyData scanKey[2];
641 
642 	Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
643 
644 	ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid,
645 				BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
646 	ScanKeyInit(&scanKey[1], Anum_pg_constraint_conname,
647 				BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(constraintName));
648 
649 	bool useIndex = false;
650 	SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, useIndex,
651 													NULL, scanKeyCount, scanKey);
652 
653 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
654 	if (HeapTupleIsValid(heapTuple))
655 	{
656 		found = true;
657 	}
658 
659 	systable_endscan(scanDescriptor);
660 	table_close(pgConstraint, NoLock);
661 
662 	return found;
663 }
664 
665 
666 /*
667  * RenameConstraintCommand creates the query string that will rename a constraint
668  */
669 static char *
RenameConstraintCommand(Oid relationId,char * constraintName,char * newConstraintName)670 RenameConstraintCommand(Oid relationId, char *constraintName, char *newConstraintName)
671 {
672 	char *qualifiedRelationName = generate_qualified_relation_name(relationId);
673 	const char *quotedConstraintName = quote_identifier(constraintName);
674 	const char *quotedNewConstraintName = quote_identifier(newConstraintName);
675 
676 	StringInfo renameCommand = makeStringInfo();
677 	appendStringInfo(renameCommand, "ALTER TABLE %s RENAME CONSTRAINT %s TO %s",
678 					 qualifiedRelationName, quotedConstraintName,
679 					 quotedNewConstraintName);
680 
681 	return renameCommand->data;
682 }
683 
684 
685 /*
686  * Returns true if the given relation is a partitioned table.
687  */
688 bool
PartitionedTable(Oid relationId)689 PartitionedTable(Oid relationId)
690 {
691 	Relation rel = try_relation_open(relationId, AccessShareLock);
692 
693 	/* don't error out for tables that are dropped */
694 	if (rel == NULL)
695 	{
696 		return false;
697 	}
698 
699 	bool partitionedTable = false;
700 
701 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
702 	{
703 		partitionedTable = true;
704 	}
705 
706 	/* keep the lock */
707 	table_close(rel, NoLock);
708 
709 	return partitionedTable;
710 }
711 
712 
713 /*
714  * Returns true if the given relation is a partitioned table. The function
715  * doesn't acquire any locks on the input relation, thus the caller is
716  * reponsible for holding the appropriate locks.
717  */
718 bool
PartitionedTableNoLock(Oid relationId)719 PartitionedTableNoLock(Oid relationId)
720 {
721 	Relation rel = try_relation_open_nolock(relationId);
722 	bool partitionedTable = false;
723 
724 	/* don't error out for tables that are dropped */
725 	if (rel == NULL)
726 	{
727 		return false;
728 	}
729 
730 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
731 	{
732 		partitionedTable = true;
733 	}
734 
735 	/* keep the lock */
736 	table_close(rel, NoLock);
737 
738 	return partitionedTable;
739 }
740 
741 
742 /*
743  * Returns true if the given relation is a partition.
744  */
745 bool
PartitionTable(Oid relationId)746 PartitionTable(Oid relationId)
747 {
748 	Relation rel = try_relation_open(relationId, AccessShareLock);
749 
750 	/* don't error out for tables that are dropped */
751 	if (rel == NULL)
752 	{
753 		return false;
754 	}
755 
756 	bool partitionTable = rel->rd_rel->relispartition;
757 
758 	/* keep the lock */
759 	table_close(rel, NoLock);
760 
761 	return partitionTable;
762 }
763 
764 
765 /*
766  * Returns true if the given relation is a partition.  The function
767  * doesn't acquire any locks on the input relation, thus the caller is
768  * reponsible for holding the appropriate locks.
769  */
770 bool
PartitionTableNoLock(Oid relationId)771 PartitionTableNoLock(Oid relationId)
772 {
773 	Relation rel = try_relation_open_nolock(relationId);
774 
775 	/* don't error out for tables that are dropped */
776 	if (rel == NULL)
777 	{
778 		return false;
779 	}
780 
781 	bool partitionTable = rel->rd_rel->relispartition;
782 
783 	/* keep the lock */
784 	table_close(rel, NoLock);
785 
786 	return partitionTable;
787 }
788 
789 
790 /*
791  * try_relation_open_nolock opens a relation with given relationId without
792  * acquiring locks. PostgreSQL's try_relation_open() asserts that caller
793  * has already acquired a lock on the relation, which we don't always do.
794  *
795  * ATTENTION:
796  *   1. Sync this with try_relation_open(). It hasn't changed for 10 to 12
797  *      releases though.
798  *   2. We should remove this after we fix the locking/distributed deadlock
799  *      issues with MX Truncate. See https://github.com/citusdata/citus/pull/2894
800  *      for more discussion.
801  */
802 static Relation
try_relation_open_nolock(Oid relationId)803 try_relation_open_nolock(Oid relationId)
804 {
805 	if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relationId)))
806 	{
807 		return NULL;
808 	}
809 
810 	Relation relation = RelationIdGetRelation(relationId);
811 	if (!RelationIsValid(relation))
812 	{
813 		return NULL;
814 	}
815 
816 	pgstat_initstats(relation);
817 
818 	return relation;
819 }
820 
821 
822 /*
823  * IsChildTable returns true if the table is inherited. Note that
824  * partition tables inherites by default. However, this function
825  * returns false if the given table is a partition.
826  */
827 bool
IsChildTable(Oid relationId)828 IsChildTable(Oid relationId)
829 {
830 	ScanKeyData key[1];
831 	HeapTuple inheritsTuple = NULL;
832 	bool tableInherits = false;
833 
834 	Relation pgInherits = table_open(InheritsRelationId, AccessShareLock);
835 
836 	ScanKeyInit(&key[0], Anum_pg_inherits_inhrelid,
837 				BTEqualStrategyNumber, F_OIDEQ,
838 				ObjectIdGetDatum(relationId));
839 
840 	SysScanDesc scan = systable_beginscan(pgInherits, InvalidOid, false,
841 										  NULL, 1, key);
842 
843 	while ((inheritsTuple = systable_getnext(scan)) != NULL)
844 	{
845 		Oid inheritedRelationId =
846 			((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
847 
848 		if (relationId == inheritedRelationId)
849 		{
850 			tableInherits = true;
851 			break;
852 		}
853 	}
854 
855 	systable_endscan(scan);
856 	table_close(pgInherits, AccessShareLock);
857 
858 	if (tableInherits && PartitionTable(relationId))
859 	{
860 		tableInherits = false;
861 	}
862 
863 	return tableInherits;
864 }
865 
866 
867 /*
868  * IsParentTable returns true if the table is inherited. Note that
869  * partitioned tables inherited by default. However, this function
870  * returns false if the given table is a partitioned table.
871  */
872 bool
IsParentTable(Oid relationId)873 IsParentTable(Oid relationId)
874 {
875 	ScanKeyData key[1];
876 	bool tableInherited = false;
877 
878 	Relation pgInherits = table_open(InheritsRelationId, AccessShareLock);
879 
880 	ScanKeyInit(&key[0], Anum_pg_inherits_inhparent,
881 				BTEqualStrategyNumber, F_OIDEQ,
882 				ObjectIdGetDatum(relationId));
883 
884 	SysScanDesc scan = systable_beginscan(pgInherits, InheritsParentIndexId, true,
885 										  NULL, 1, key);
886 
887 	if (systable_getnext(scan) != NULL)
888 	{
889 		tableInherited = true;
890 	}
891 	systable_endscan(scan);
892 	table_close(pgInherits, AccessShareLock);
893 
894 	if (tableInherited && PartitionedTable(relationId))
895 	{
896 		tableInherited = false;
897 	}
898 
899 	return tableInherited;
900 }
901 
902 
903 /*
904  * Wrapper around get_partition_parent
905  *
906  * Note: Because this function assumes that the relation whose OID is passed
907  * as an argument will have precisely one parent, it should only be called
908  * when it is known that the relation is a partition.
909  */
910 Oid
PartitionParentOid(Oid partitionOid)911 PartitionParentOid(Oid partitionOid)
912 {
913 	Oid partitionParentOid = get_partition_parent_compat(partitionOid, false);
914 
915 	return partitionParentOid;
916 }
917 
918 
919 /*
920  * PartitionWithLongestNameRelationId is a utility function that returns the
921  * oid of the partition table that has the longest name in terms of number of
922  * characters.
923  */
924 Oid
PartitionWithLongestNameRelationId(Oid parentRelationId)925 PartitionWithLongestNameRelationId(Oid parentRelationId)
926 {
927 	Oid longestNamePartitionId = InvalidOid;
928 	int longestNameLength = 0;
929 	List *partitionList = PartitionList(parentRelationId);
930 
931 	Oid partitionRelationId = InvalidOid;
932 	foreach_oid(partitionRelationId, partitionList)
933 	{
934 		char *partitionName = get_rel_name(partitionRelationId);
935 		int partitionNameLength = strnlen(partitionName, NAMEDATALEN);
936 		if (partitionNameLength > longestNameLength)
937 		{
938 			longestNamePartitionId = partitionRelationId;
939 			longestNameLength = partitionNameLength;
940 		}
941 	}
942 
943 	return longestNamePartitionId;
944 }
945 
946 
947 /*
948  * Takes a parent relation and returns Oid list of its partitions. The
949  * function errors out if the given relation is not a parent.
950  */
951 List *
PartitionList(Oid parentRelationId)952 PartitionList(Oid parentRelationId)
953 {
954 	Relation rel = table_open(parentRelationId, AccessShareLock);
955 	List *partitionList = NIL;
956 
957 
958 	if (!PartitionedTable(parentRelationId))
959 	{
960 		char *relationName = get_rel_name(parentRelationId);
961 
962 		ereport(ERROR, (errmsg("\"%s\" is not a parent table", relationName)));
963 	}
964 	PartitionDesc partDesc = RelationGetPartitionDesc_compat(rel, true);
965 	Assert(partDesc != NULL);
966 
967 	int partitionCount = partDesc->nparts;
968 	for (int partitionIndex = 0; partitionIndex < partitionCount; ++partitionIndex)
969 	{
970 		partitionList =
971 			lappend_oid(partitionList, partDesc->oids[partitionIndex]);
972 	}
973 
974 	/* keep the lock */
975 	table_close(rel, NoLock);
976 
977 	return partitionList;
978 }
979 
980 
981 /*
982  * GenerateDetachPartitionCommand gets a partition table and returns
983  * "ALTER TABLE parent_table DETACH PARTITION partitionName" command.
984  */
985 char *
GenerateDetachPartitionCommand(Oid partitionTableId)986 GenerateDetachPartitionCommand(Oid partitionTableId)
987 {
988 	StringInfo detachPartitionCommand = makeStringInfo();
989 
990 	if (!PartitionTable(partitionTableId))
991 	{
992 		char *relationName = get_rel_name(partitionTableId);
993 
994 		ereport(ERROR, (errmsg("\"%s\" is not a partition", relationName)));
995 	}
996 
997 	Oid parentId = get_partition_parent_compat(partitionTableId, false);
998 	char *tableQualifiedName = generate_qualified_relation_name(partitionTableId);
999 	char *parentTableQualifiedName = generate_qualified_relation_name(parentId);
1000 
1001 	appendStringInfo(detachPartitionCommand,
1002 					 "ALTER TABLE IF EXISTS %s DETACH PARTITION %s;",
1003 					 parentTableQualifiedName, tableQualifiedName);
1004 
1005 	return detachPartitionCommand->data;
1006 }
1007 
1008 
1009 /*
1010  * GenereatePartitioningInformation returns the partitioning type and partition column
1011  * for the given parent table in the form of "PARTITION TYPE (partitioning column(s)/expression(s))".
1012  */
1013 char *
GeneratePartitioningInformation(Oid parentTableId)1014 GeneratePartitioningInformation(Oid parentTableId)
1015 {
1016 	char *partitionBoundCString = "";
1017 
1018 	if (!PartitionedTable(parentTableId))
1019 	{
1020 		char *relationName = get_rel_name(parentTableId);
1021 
1022 		ereport(ERROR, (errmsg("\"%s\" is not a parent table", relationName)));
1023 	}
1024 
1025 	Datum partitionBoundDatum = DirectFunctionCall1(pg_get_partkeydef,
1026 													ObjectIdGetDatum(parentTableId));
1027 
1028 	partitionBoundCString = TextDatumGetCString(partitionBoundDatum);
1029 
1030 	return partitionBoundCString;
1031 }
1032 
1033 
1034 /*
1035  * GenerateAttachShardPartitionCommand generates command to attach a child table
1036  * table to its parent in a partitioning hierarchy.
1037  */
1038 char *
GenerateAttachShardPartitionCommand(ShardInterval * shardInterval)1039 GenerateAttachShardPartitionCommand(ShardInterval *shardInterval)
1040 {
1041 	Oid schemaId = get_rel_namespace(shardInterval->relationId);
1042 	char *schemaName = get_namespace_name(schemaId);
1043 	char *escapedSchemaName = quote_literal_cstr(schemaName);
1044 
1045 	char *command = GenerateAlterTableAttachPartitionCommand(shardInterval->relationId);
1046 	char *escapedCommand = quote_literal_cstr(command);
1047 	int shardIndex = ShardIndex(shardInterval);
1048 
1049 
1050 	StringInfo attachPartitionCommand = makeStringInfo();
1051 
1052 	Oid parentRelationId = PartitionParentOid(shardInterval->relationId);
1053 	if (parentRelationId == InvalidOid)
1054 	{
1055 		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1056 						errmsg("cannot attach partition"),
1057 						errdetail("Referenced relation cannot be found.")));
1058 	}
1059 
1060 	Oid parentSchemaId = get_rel_namespace(parentRelationId);
1061 	char *parentSchemaName = get_namespace_name(parentSchemaId);
1062 	char *escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
1063 	uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex);
1064 
1065 	appendStringInfo(attachPartitionCommand,
1066 					 WORKER_APPLY_INTER_SHARD_DDL_COMMAND, parentShardId,
1067 					 escapedParentSchemaName, shardInterval->shardId,
1068 					 escapedSchemaName, escapedCommand);
1069 
1070 	return attachPartitionCommand->data;
1071 }
1072 
1073 
1074 /*
1075  * GenerateAlterTableAttachPartitionCommand returns the necessary command to
1076  * attach the given partition to its parent.
1077  */
1078 char *
GenerateAlterTableAttachPartitionCommand(Oid partitionTableId)1079 GenerateAlterTableAttachPartitionCommand(Oid partitionTableId)
1080 {
1081 	StringInfo createPartitionCommand = makeStringInfo();
1082 
1083 
1084 	if (!PartitionTable(partitionTableId))
1085 	{
1086 		char *relationName = get_rel_name(partitionTableId);
1087 
1088 		ereport(ERROR, (errmsg("\"%s\" is not a partition", relationName)));
1089 	}
1090 
1091 	Oid parentId = get_partition_parent_compat(partitionTableId, false);
1092 	char *tableQualifiedName = generate_qualified_relation_name(partitionTableId);
1093 	char *parentTableQualifiedName = generate_qualified_relation_name(parentId);
1094 
1095 	char *partitionBoundCString = PartitionBound(partitionTableId);
1096 
1097 	appendStringInfo(createPartitionCommand, "ALTER TABLE %s ATTACH PARTITION %s %s;",
1098 					 parentTableQualifiedName, tableQualifiedName,
1099 					 partitionBoundCString);
1100 
1101 	return createPartitionCommand->data;
1102 }
1103 
1104 
1105 /*
1106  * This function heaviliy inspired from RelationBuildPartitionDesc()
1107  * which is avaliable in src/backend/catalog/partition.c.
1108  *
1109  * The function simply reads the pg_class and gets the partition bound.
1110  * Later, converts it to text format and returns.
1111  */
1112 static char *
PartitionBound(Oid partitionId)1113 PartitionBound(Oid partitionId)
1114 {
1115 	bool isnull = false;
1116 
1117 	HeapTuple tuple = SearchSysCache1(RELOID, partitionId);
1118 	if (!HeapTupleIsValid(tuple))
1119 	{
1120 		elog(ERROR, "cache lookup failed for relation %u", partitionId);
1121 	}
1122 
1123 	/*
1124 	 * It is possible that the pg_class tuple of a partition has not been
1125 	 * updated yet to set its relpartbound field.  The only case where
1126 	 * this happens is when we open the parent relation to check using its
1127 	 * partition descriptor that a new partition's bound does not overlap
1128 	 * some existing partition.
1129 	 */
1130 	if (!((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1131 	{
1132 		ReleaseSysCache(tuple);
1133 		return "";
1134 	}
1135 
1136 	Datum datum = SysCacheGetAttr(RELOID, tuple,
1137 								  Anum_pg_class_relpartbound,
1138 								  &isnull);
1139 	Assert(!isnull);
1140 
1141 	Datum partitionBoundDatum =
1142 		DirectFunctionCall2(pg_get_expr, datum, ObjectIdGetDatum(partitionId));
1143 
1144 	char *partitionBoundString = TextDatumGetCString(partitionBoundDatum);
1145 
1146 	ReleaseSysCache(tuple);
1147 
1148 	return partitionBoundString;
1149 }
1150