1 /*-------------------------------------------------------------------------
2  *
3  * repair_shards.c
4  *
5  * This file contains functions to repair unhealthy shard placements using data
6  * from healthy ones.
7  *
8  * Copyright (c) Citus Data, Inc.
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 #include "postgres.h"
14 #include "fmgr.h"
15 #include "miscadmin.h"
16 
17 #include <string.h>
18 #include <sys/statvfs.h>
19 
20 #include "access/htup_details.h"
21 #include "catalog/pg_class.h"
22 #include "catalog/pg_enum.h"
23 #include "distributed/citus_ruleutils.h"
24 #include "distributed/colocation_utils.h"
25 #include "distributed/commands.h"
26 #include "distributed/connection_management.h"
27 #include "distributed/distributed_planner.h"
28 #include "distributed/listutils.h"
29 #include "distributed/shard_cleaner.h"
30 #include "distributed/coordinator_protocol.h"
31 #include "distributed/repair_shards.h"
32 #include "distributed/metadata_cache.h"
33 #include "distributed/metadata_sync.h"
34 #include "distributed/multi_join_order.h"
35 #include "distributed/multi_partitioning_utils.h"
36 #include "distributed/reference_table_utils.h"
37 #include "distributed/remote_commands.h"
38 #include "distributed/resource_lock.h"
39 #include "distributed/worker_manager.h"
40 #include "distributed/worker_protocol.h"
41 #include "distributed/worker_transaction.h"
42 #include "lib/stringinfo.h"
43 #include "nodes/pg_list.h"
44 #include "storage/lmgr.h"
45 #include "storage/lock.h"
46 #include "storage/lmgr.h"
47 #include "utils/builtins.h"
48 #include "utils/elog.h"
49 #include "utils/errcodes.h"
50 #include "utils/lsyscache.h"
51 #include "utils/palloc.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54 
55 /* local function forward declarations */
56 static void ErrorIfTableCannotBeReplicated(Oid relationId);
57 static void RepairShardPlacement(int64 shardId, const char *sourceNodeName,
58 								 int32 sourceNodePort, const char *targetNodeName,
59 								 int32 targetNodePort);
60 static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
61 											 int32 sourceNodePort, char *targetNodeName,
62 											 int32 targetNodePort,
63 											 char shardReplicationMode);
64 static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
65 							int32 sourceNodePort, char *targetNodeName,
66 							int32 targetNodePort, bool useLogicalReplication);
67 static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
68 										  int32 sourceNodePort,
69 										  char *targetNodeName, int32 targetNodePort);
70 static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
71 											 const char *sourceNodeName,
72 											 int32 sourceNodePort);
73 static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName,
74 									 int32 sourceNodePort, const char *targetNodeName,
75 									 int32 targetNodePort);
76 static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
77 								   int32 sourceNodePort, const char *targetNodeName,
78 								   int32 targetNodePort);
79 static List * RecreateTableDDLCommandList(Oid relationId);
80 static void EnsureTableListOwner(List *tableIdList);
81 static void EnsureTableListSuitableForReplication(List *tableIdList);
82 
83 static void DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName,
84 										int32 nodePort);
85 static void MarkForDropColocatedShardPlacement(ShardInterval *shardInterval,
86 											   char *nodeName, int32 nodePort);
87 static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
88 														   char *sourceNodeName,
89 														   int32 sourceNodePort,
90 														   char *targetNodeName,
91 														   int32 targetNodePort);
92 static void CheckSpaceConstraints(MultiConnection *connection,
93 								  uint64 colocationSizeInBytes);
94 static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
95 											  char *sourceNodeName, uint32 sourceNodePort,
96 											  char *targetNodeName, uint32
97 											  targetNodePort);
98 static List * RecreateShardDDLCommandList(ShardInterval *shardInterval,
99 										  const char *sourceNodeName,
100 										  int32 sourceNodePort);
101 static List * CopyShardContentsCommandList(ShardInterval *shardInterval,
102 										   const char *sourceNodeName,
103 										   int32 sourceNodePort);
104 static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval,
105 											   const char *sourceNodeName,
106 											   int32 sourceNodePort);
107 
108 
109 /* declarations for dynamic loading */
110 PG_FUNCTION_INFO_V1(citus_copy_shard_placement);
111 PG_FUNCTION_INFO_V1(master_copy_shard_placement);
112 PG_FUNCTION_INFO_V1(citus_move_shard_placement);
113 PG_FUNCTION_INFO_V1(master_move_shard_placement);
114 
115 bool DeferShardDeleteOnMove = false;
116 
117 double DesiredPercentFreeAfterMove = 10;
118 bool CheckAvailableSpaceBeforeMove = true;
119 
120 
121 /*
122  * citus_copy_shard_placement implements a user-facing UDF to repair data from
123  * a healthy (source) node to an inactive (target) node. To accomplish this it
124  * entirely recreates the table structure before copying all data. During this
125  * time all modifications are paused to the shard. After successful repair, the
126  * inactive placement is marked healthy and modifications may continue. If the
127  * repair fails at any point, this function throws an error, leaving the node
128  * in an unhealthy state. Please note that citus_copy_shard_placement copies
129  * given shard along with its co-located shards.
130  */
131 Datum
citus_copy_shard_placement(PG_FUNCTION_ARGS)132 citus_copy_shard_placement(PG_FUNCTION_ARGS)
133 {
134 	CheckCitusVersion(ERROR);
135 	EnsureCoordinator();
136 
137 	int64 shardId = PG_GETARG_INT64(0);
138 	text *sourceNodeNameText = PG_GETARG_TEXT_P(1);
139 	int32 sourceNodePort = PG_GETARG_INT32(2);
140 	text *targetNodeNameText = PG_GETARG_TEXT_P(3);
141 	int32 targetNodePort = PG_GETARG_INT32(4);
142 	bool doRepair = PG_GETARG_BOOL(5);
143 	Oid shardReplicationModeOid = PG_GETARG_OID(6);
144 
145 	char *sourceNodeName = text_to_cstring(sourceNodeNameText);
146 	char *targetNodeName = text_to_cstring(targetNodeNameText);
147 
148 	char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
149 	if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
150 	{
151 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
152 						errmsg("the force_logical transfer mode is currently "
153 							   "unsupported")));
154 	}
155 
156 	ShardInterval *shardInterval = LoadShardInterval(shardId);
157 	ErrorIfTableCannotBeReplicated(shardInterval->relationId);
158 
159 	if (doRepair)
160 	{
161 		RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
162 							 targetNodePort);
163 	}
164 	else
165 	{
166 		ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
167 										 targetNodeName, targetNodePort,
168 										 shardReplicationMode);
169 	}
170 
171 	PG_RETURN_VOID();
172 }
173 
174 
175 /*
176  * master_copy_shard_placement is a wrapper function for old UDF name.
177  */
178 Datum
master_copy_shard_placement(PG_FUNCTION_ARGS)179 master_copy_shard_placement(PG_FUNCTION_ARGS)
180 {
181 	return citus_copy_shard_placement(fcinfo);
182 }
183 
184 
185 /*
186  * ShardListSizeInBytes returns the size in bytes of a set of shard tables.
187  */
188 uint64
ShardListSizeInBytes(List * shardList,char * workerNodeName,uint32 workerNodePort)189 ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32
190 					 workerNodePort)
191 {
192 	uint32 connectionFlag = 0;
193 
194 	/* we skip child tables of a partitioned table if this boolean variable is true */
195 	bool optimizePartitionCalculations = true;
196 	StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
197 																	  TOTAL_RELATION_SIZE,
198 																	  optimizePartitionCalculations);
199 
200 	MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
201 													workerNodePort);
202 	PGresult *result = NULL;
203 	int queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data,
204 												   &result);
205 
206 	if (queryResult != RESPONSE_OKAY)
207 	{
208 		ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
209 						errmsg("cannot get the size because of a connection error")));
210 	}
211 
212 	List *sizeList = ReadFirstColumnAsText(result);
213 	if (list_length(sizeList) != 1)
214 	{
215 		ereport(ERROR, (errmsg(
216 							"received wrong number of rows from worker, expected 1 received %d",
217 							list_length(sizeList))));
218 	}
219 
220 	StringInfo totalSizeStringInfo = (StringInfo) linitial(sizeList);
221 	char *totalSizeString = totalSizeStringInfo->data;
222 	uint64 totalSize = SafeStringToUint64(totalSizeString);
223 
224 	PQclear(result);
225 	ForgetResults(connection);
226 
227 	return totalSize;
228 }
229 
230 
231 /*
232  * CheckSpaceConstraints checks there is enough space to place the colocation
233  * on the node that the connection is connected to.
234  */
235 static void
CheckSpaceConstraints(MultiConnection * connection,uint64 colocationSizeInBytes)236 CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes)
237 {
238 	uint64 diskAvailableInBytes = 0;
239 	uint64 diskSizeInBytes = 0;
240 	bool success =
241 		GetNodeDiskSpaceStatsForConnection(connection, &diskAvailableInBytes,
242 										   &diskSizeInBytes);
243 	if (!success)
244 	{
245 		ereport(ERROR, (errmsg("Could not fetch disk stats for node: %s-%d",
246 							   connection->hostname, connection->port)));
247 	}
248 
249 	uint64 diskAvailableInBytesAfterShardMove = 0;
250 	if (diskAvailableInBytes < colocationSizeInBytes)
251 	{
252 		/*
253 		 * even though the space will be less than "0", we set it to 0 for convenience.
254 		 */
255 		diskAvailableInBytes = 0;
256 	}
257 	else
258 	{
259 		diskAvailableInBytesAfterShardMove = diskAvailableInBytes - colocationSizeInBytes;
260 	}
261 	uint64 desiredNewDiskAvailableInBytes = diskSizeInBytes *
262 											(DesiredPercentFreeAfterMove / 100);
263 	if (diskAvailableInBytesAfterShardMove < desiredNewDiskAvailableInBytes)
264 	{
265 		ereport(ERROR, (errmsg("not enough empty space on node if the shard is moved, "
266 							   "actual available space after move will be %ld bytes, "
267 							   "desired available space after move is %ld bytes,"
268 							   "estimated size increase on node after move is %ld bytes.",
269 							   diskAvailableInBytesAfterShardMove,
270 							   desiredNewDiskAvailableInBytes, colocationSizeInBytes),
271 						errhint(
272 							"consider lowering citus.desired_percent_disk_available_after_move.")));
273 	}
274 }
275 
276 
277 /*
278  * citus_move_shard_placement moves given shard (and its co-located shards) from one
279  * node to the other node. To accomplish this it entirely recreates the table structure
280  * before copying all data.
281  *
282  * After that, there are two different paths. First one is blocking shard move in the
283  * sense that during shard move all modifications are paused to the shard. The second
284  * one relies on logical replication meaning that the writes blocked only for a very
285  * short duration almost only when the metadata is actually being updated. This option
286  * is currently only available in Citus Enterprise.
287  *
288  * After successful move operation, shards in the source node gets deleted. If the move
289  * fails at any point, this function throws an error, leaving the cluster without doing
290  * any changes in source node or target node.
291  */
292 Datum
citus_move_shard_placement(PG_FUNCTION_ARGS)293 citus_move_shard_placement(PG_FUNCTION_ARGS)
294 {
295 	CheckCitusVersion(ERROR);
296 	EnsureCoordinator();
297 
298 	int64 shardId = PG_GETARG_INT64(0);
299 	char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1));
300 	int32 sourceNodePort = PG_GETARG_INT32(2);
301 	char *targetNodeName = text_to_cstring(PG_GETARG_TEXT_P(3));
302 	int32 targetNodePort = PG_GETARG_INT32(4);
303 	Oid shardReplicationModeOid = PG_GETARG_OID(5);
304 
305 
306 	ListCell *colocatedTableCell = NULL;
307 	ListCell *colocatedShardCell = NULL;
308 
309 	Oid relationId = RelationIdForShard(shardId);
310 	ErrorIfMoveUnsupportedTableType(relationId);
311 	ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
312 
313 	ShardInterval *shardInterval = LoadShardInterval(shardId);
314 	Oid distributedTableId = shardInterval->relationId;
315 
316 	List *colocatedTableList = ColocatedTableList(distributedTableId);
317 	List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
318 
319 	foreach(colocatedTableCell, colocatedTableList)
320 	{
321 		Oid colocatedTableId = lfirst_oid(colocatedTableCell);
322 		char relationKind = '\0';
323 
324 		/* check that user has owner rights in all co-located tables */
325 		EnsureTableOwner(colocatedTableId);
326 
327 		/*
328 		 * Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
329 		 * block concurrent citus_move_shard_placement() on any shard of
330 		 * the same relation. This is OK for now since we're executing shard
331 		 * moves sequentially anyway.
332 		 */
333 		LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
334 
335 		relationKind = get_rel_relkind(colocatedTableId);
336 		if (relationKind == RELKIND_FOREIGN_TABLE)
337 		{
338 			char *relationName = get_rel_name(colocatedTableId);
339 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
340 							errmsg("cannot repair shard"),
341 							errdetail("Table %s is a foreign table. Repairing "
342 									  "shards backed by foreign tables is "
343 									  "not supported.", relationName)));
344 		}
345 	}
346 
347 	/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
348 	colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
349 	foreach(colocatedShardCell, colocatedShardList)
350 	{
351 		ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
352 		uint64 colocatedShardId = colocatedShard->shardId;
353 
354 		EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
355 							   targetNodeName, targetNodePort);
356 	}
357 
358 	char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
359 	if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
360 	{
361 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
362 						errmsg("the force_logical transfer mode is currently "
363 							   "unsupported")));
364 	}
365 
366 	EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort,
367 									  targetNodeName, targetNodePort);
368 
369 	BlockWritesToShardList(colocatedShardList);
370 
371 	/*
372 	 * CopyColocatedShardPlacement function copies given shard with its co-located
373 	 * shards.
374 	 */
375 	bool useLogicalReplication = false;
376 	CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
377 					targetNodePort, useLogicalReplication);
378 
379 	ShardInterval *colocatedShard = NULL;
380 	foreach_ptr(colocatedShard, colocatedShardList)
381 	{
382 		uint64 colocatedShardId = colocatedShard->shardId;
383 		uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
384 		uint64 placementId = GetNextPlacementId();
385 
386 		InsertShardPlacementRow(colocatedShardId, placementId,
387 								SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
388 								groupId);
389 	}
390 
391 	/* since this is move operation, we remove shards from source node after copy */
392 	if (DeferShardDeleteOnMove)
393 	{
394 		MarkForDropColocatedShardPlacement(shardInterval, sourceNodeName, sourceNodePort);
395 	}
396 	else
397 	{
398 		DropColocatedShardPlacement(shardInterval, sourceNodeName, sourceNodePort);
399 	}
400 
401 	UpdateColocatedShardPlacementMetadataOnWorkers(shardId, sourceNodeName,
402 												   sourceNodePort, targetNodeName,
403 												   targetNodePort);
404 
405 	PG_RETURN_VOID();
406 }
407 
408 
409 /*
410  * EnsureEnoughDiskSpaceForShardMove checks that there is enough space for
411  * shard moves of the given colocated shard list from source node to target node.
412  * It tries to clean up old shard placements to ensure there is enough space.
413  */
414 static void
EnsureEnoughDiskSpaceForShardMove(List * colocatedShardList,char * sourceNodeName,uint32 sourceNodePort,char * targetNodeName,uint32 targetNodePort)415 EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
416 								  char *sourceNodeName, uint32 sourceNodePort,
417 								  char *targetNodeName, uint32 targetNodePort)
418 {
419 	if (!CheckAvailableSpaceBeforeMove)
420 	{
421 		return;
422 	}
423 	uint64 colocationSizeInBytes = ShardListSizeInBytes(colocatedShardList,
424 														sourceNodeName,
425 														sourceNodePort);
426 
427 	uint32 connectionFlag = 0;
428 	MultiConnection *connection = GetNodeConnection(connectionFlag, targetNodeName,
429 													targetNodePort);
430 	CheckSpaceConstraints(connection, colocationSizeInBytes);
431 }
432 
433 
434 /*
435  * ErrorIfTargetNodeIsNotSafeToMove throws error if the target node is not
436  * eligible for moving shards.
437  */
438 void
ErrorIfTargetNodeIsNotSafeToMove(const char * targetNodeName,int targetNodePort)439 ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort)
440 {
441 	WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort);
442 	if (workerNode == NULL)
443 	{
444 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
445 						errmsg("Moving shards to a non-existing node is not supported"),
446 						errhint(
447 							"Add the target node via SELECT citus_add_node('%s', %d);",
448 							targetNodeName, targetNodePort)));
449 	}
450 
451 	if (!workerNode->isActive)
452 	{
453 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
454 						errmsg("Moving shards to a non-active node is not supported"),
455 						errhint(
456 							"Activate the target node via SELECT citus_activate_node('%s', %d);",
457 							targetNodeName, targetNodePort)));
458 	}
459 
460 	if (!workerNode->shouldHaveShards)
461 	{
462 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
463 						errmsg("Moving shards to a node that shouldn't have a shard is "
464 							   "not supported"),
465 						errhint("Allow shards on the target node via "
466 								"SELECT * FROM citus_set_node_property('%s', %d, 'shouldhaveshards', true);",
467 								targetNodeName, targetNodePort)));
468 	}
469 
470 	if (!NodeIsPrimary(workerNode))
471 	{
472 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
473 						errmsg("Moving shards to a secondary (e.g., replica) node is "
474 							   "not supported")));
475 	}
476 }
477 
478 
479 /*
480  * master_move_shard_placement is a wrapper around citus_move_shard_placement.
481  */
482 Datum
master_move_shard_placement(PG_FUNCTION_ARGS)483 master_move_shard_placement(PG_FUNCTION_ARGS)
484 {
485 	return citus_move_shard_placement(fcinfo);
486 }
487 
488 
489 /*
490  * ErrorIfMoveUnsupportedTableType is a helper function for rebalance_table_shards
491  * and citus_move_shard_placement udf's to error out if relation with relationId
492  * is not a distributed table.
493  */
494 void
ErrorIfMoveUnsupportedTableType(Oid relationId)495 ErrorIfMoveUnsupportedTableType(Oid relationId)
496 {
497 	if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
498 	{
499 		return;
500 	}
501 
502 	char *qualifiedRelationName = generate_qualified_relation_name(relationId);
503 	if (!IsCitusTable(relationId))
504 	{
505 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
506 						errmsg("table %s is a regular postgres table, you can "
507 							   "only move shards of a citus table",
508 							   qualifiedRelationName)));
509 	}
510 	else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
511 	{
512 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
513 						errmsg("table %s is a local table, moving shard of "
514 							   "a local table added to metadata is currently "
515 							   "not supported", qualifiedRelationName)));
516 	}
517 	else if (IsCitusTableType(relationId, REFERENCE_TABLE))
518 	{
519 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
520 						errmsg("table %s is a reference table, moving shard of "
521 							   "a reference table is not supported",
522 							   qualifiedRelationName)));
523 	}
524 }
525 
526 
527 /*
528  * BlockWritesToColocatedShardList blocks writes to all shards in the given shard
529  * list. The function assumes that all the shards in the list are colocated.
530  */
531 void
BlockWritesToShardList(List * shardList)532 BlockWritesToShardList(List *shardList)
533 {
534 	ShardInterval *shard = NULL;
535 	foreach_ptr(shard, shardList)
536 	{
537 		/*
538 		 * We need to lock the referenced reference table metadata to avoid
539 		 * asynchronous shard copy in case of cascading DML operations.
540 		 */
541 		LockReferencedReferenceShardDistributionMetadata(shard->shardId,
542 														 ExclusiveLock);
543 
544 		LockShardDistributionMetadata(shard->shardId, ExclusiveLock);
545 	}
546 
547 	/* following code relies on the list to have at least one shard */
548 	if (list_length(shardList) == 0)
549 	{
550 		return;
551 	}
552 
553 	/*
554 	 * Since the function assumes that the input shards are colocated,
555 	 * calculating shouldSyncMetadata for a single table is sufficient.
556 	 */
557 	ShardInterval *firstShardInterval = (ShardInterval *) linitial(shardList);
558 	Oid firstDistributedTableId = firstShardInterval->relationId;
559 
560 	bool shouldSyncMetadata = ShouldSyncTableMetadata(firstDistributedTableId);
561 	if (shouldSyncMetadata)
562 	{
563 		LockShardListMetadataOnWorkers(ExclusiveLock, shardList);
564 	}
565 }
566 
567 
568 /*
569  * ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable
570  * for its shard being replicated. There are 2 cases in which shard replication is not
571  * allowed:
572  *
573  * 1) MX tables, since RF=1 is a must MX tables
574  * 2) Reference tables, since the shard should already exist in all workers
575  */
576 static void
ErrorIfTableCannotBeReplicated(Oid relationId)577 ErrorIfTableCannotBeReplicated(Oid relationId)
578 {
579 	/*
580 	 * Note that ShouldSyncTableMetadata() returns true for both MX tables
581 	 * and reference tables.
582 	 */
583 	bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
584 	if (!shouldSyncMetadata)
585 	{
586 		return;
587 	}
588 
589 	CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
590 	char *relationName = get_rel_name(relationId);
591 
592 	if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE))
593 	{
594 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
595 						(errmsg("Table %s is a local table. Replicating "
596 								"shard of a local table added to metadata "
597 								"currently is not supported",
598 								quote_literal_cstr(relationName)))));
599 	}
600 
601 	/*
602 	 * ShouldSyncTableMetadata() returns true also for reference table,
603 	 * we don't want to error in that case since reference tables aren't
604 	 * automatically replicated to active nodes with no shards, and
605 	 * master_copy_shard_placement() can be used to create placements in
606 	 * such nodes.
607 	 */
608 	if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
609 	{
610 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
611 						(errmsg("Table %s is streaming replicated. Shards "
612 								"of streaming replicated tables cannot "
613 								"be copied", quote_literal_cstr(relationName)))));
614 	}
615 }
616 
617 
618 /*
619  * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
620  * values to a char.
621  */
622 char
LookupShardTransferMode(Oid shardReplicationModeOid)623 LookupShardTransferMode(Oid shardReplicationModeOid)
624 {
625 	char shardReplicationMode = 0;
626 
627 	Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardReplicationModeOid);
628 	char *enumLabel = DatumGetCString(enumLabelDatum);
629 
630 	if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0)
631 	{
632 		shardReplicationMode = TRANSFER_MODE_AUTOMATIC;
633 	}
634 	else if (strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0)
635 	{
636 		shardReplicationMode = TRANSFER_MODE_FORCE_LOGICAL;
637 	}
638 	else if (strncmp(enumLabel, "block_writes", NAMEDATALEN) == 0)
639 	{
640 		shardReplicationMode = TRANSFER_MODE_BLOCK_WRITES;
641 	}
642 	else
643 	{
644 		ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
645 	}
646 
647 	return shardReplicationMode;
648 }
649 
650 
651 /*
652  * RepairShardPlacement repairs given shard from a source node to target node.
653  * This function is not co-location aware. It only repairs given shard.
654  */
655 static void
RepairShardPlacement(int64 shardId,const char * sourceNodeName,int32 sourceNodePort,const char * targetNodeName,int32 targetNodePort)656 RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
657 					 const char *targetNodeName, int32 targetNodePort)
658 {
659 	ShardInterval *shardInterval = LoadShardInterval(shardId);
660 	Oid distributedTableId = shardInterval->relationId;
661 
662 	char relationKind = get_rel_relkind(distributedTableId);
663 	char *tableOwner = TableOwner(shardInterval->relationId);
664 
665 	/* prevent table from being dropped */
666 	LockRelationOid(distributedTableId, AccessShareLock);
667 
668 	EnsureTableOwner(distributedTableId);
669 
670 	if (relationKind == RELKIND_FOREIGN_TABLE)
671 	{
672 		char *relationName = get_rel_name(distributedTableId);
673 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
674 						errmsg("cannot repair shard"),
675 						errdetail("Table %s is a foreign table. Repairing "
676 								  "shards backed by foreign tables is "
677 								  "not supported.", relationName)));
678 	}
679 
680 	/*
681 	 * Let's not allow repairing partitions to prevent any edge cases.
682 	 * We're already not allowing any kind of modifications on the partitions
683 	 * so their placements are not likely to be marked as INVALID. The only
684 	 * possible case to mark placement of a partition as invalid is
685 	 * "ALTER TABLE parent_table DETACH PARTITION partition_table". But,
686 	 * given that the table would become a regular distributed table if the
687 	 * command succeeds, we're OK since the regular distributed tables can
688 	 * be repaired later on.
689 	 */
690 	EnsurePartitionTableNotReplicated(distributedTableId);
691 
692 	/*
693 	 * We take a lock on the referenced table if there is a foreign constraint
694 	 * during the copy procedure. If we do not block DMLs on the referenced
695 	 * table, we cannot avoid the inconsistency between the two copies of the
696 	 * data. Currently, we do not support replication factor > 1 on the tables
697 	 * with foreign constraints, so this command will fail for this case anyway.
698 	 * However, it is taken as a precaution in case we support it one day.
699 	 */
700 	LockReferencedReferenceShardDistributionMetadata(shardId, ExclusiveLock);
701 
702 	/*
703 	 * We plan to move the placement to the healthy state, so we need to grab a shard
704 	 * metadata lock (in exclusive mode).
705 	 */
706 	LockShardDistributionMetadata(shardId, ExclusiveLock);
707 
708 	/*
709 	 * For shard repair, there should be healthy placement in source node and unhealthy
710 	 * placement in the target node.
711 	 */
712 	EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName,
713 							 targetNodePort);
714 
715 	/*
716 	 * If the shard belongs to a partitioned table, we need to load the data after
717 	 * creating the partitions and the partitioning hierarcy.
718 	 */
719 	bool partitionedTable = PartitionedTableNoLock(distributedTableId);
720 	bool includeData = !partitionedTable;
721 
722 	/* we generate necessary commands to recreate the shard in target node */
723 	List *ddlCommandList =
724 		CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeData);
725 
726 	List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList(
727 		shardInterval);
728 	ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList);
729 
730 	/*
731 	 * CopyShardCommandList() drops the table which cascades to partitions if the
732 	 * table is a partitioned table. This means that we need to create both parent
733 	 * table and its partitions.
734 	 *
735 	 * We also skipped copying the data, so include it here.
736 	 */
737 	if (partitionedTable)
738 	{
739 		char *shardName = ConstructQualifiedShardName(shardInterval);
740 		StringInfo copyShardDataCommand = makeStringInfo();
741 
742 		List *partitionCommandList =
743 			CopyPartitionShardsCommandList(shardInterval, sourceNodeName, sourceNodePort);
744 		ddlCommandList = list_concat(ddlCommandList, partitionCommandList);
745 
746 		/* finally copy the data as well */
747 		appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
748 						 quote_literal_cstr(shardName), /* table to append */
749 						 quote_literal_cstr(shardName), /* remote table name */
750 						 quote_literal_cstr(sourceNodeName), /* remote host */
751 						 sourceNodePort); /* remote port */
752 		ddlCommandList = lappend(ddlCommandList, copyShardDataCommand->data);
753 	}
754 
755 	EnsureNoModificationsHaveBeenDone();
756 	SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner,
757 											  ddlCommandList);
758 
759 	/* after successful repair, we update shard state as healthy*/
760 	List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
761 	ShardPlacement *placement = SearchShardPlacementInListOrError(placementList,
762 																  targetNodeName,
763 																  targetNodePort);
764 	UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE);
765 }
766 
767 
768 /*
769  * ReplicateColocatedShardPlacement replicates the given shard and its
770  * colocated shards from a source node to target node.
771  */
772 static void
ReplicateColocatedShardPlacement(int64 shardId,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort,char shardReplicationMode)773 ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
774 								 int32 sourceNodePort, char *targetNodeName,
775 								 int32 targetNodePort, char shardReplicationMode)
776 {
777 	ShardInterval *shardInterval = LoadShardInterval(shardId);
778 	Oid distributedTableId = shardInterval->relationId;
779 
780 	List *colocatedTableList = ColocatedTableList(distributedTableId);
781 	List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
782 
783 	EnsureTableListOwner(colocatedTableList);
784 	EnsureTableListSuitableForReplication(colocatedTableList);
785 
786 	/*
787 	 * We sort shardIntervalList so that lock operations will not cause any
788 	 * deadlocks.
789 	 */
790 	colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
791 
792 	BlockWritesToShardList(colocatedShardList);
793 
794 	ShardInterval *colocatedShard = NULL;
795 	foreach_ptr(colocatedShard, colocatedShardList)
796 	{
797 		uint64 colocatedShardId = colocatedShard->shardId;
798 
799 		/*
800 		 * For shard copy, there should be healthy placement in source node and no
801 		 * placement in the target node.
802 		 */
803 		EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
804 							   targetNodeName, targetNodePort);
805 	}
806 
807 	if (!IsCitusTableType(distributedTableId, REFERENCE_TABLE))
808 	{
809 		/*
810 		 * When copying a shard to a new node, we should first ensure that reference
811 		 * tables are present such that joins work immediately after copying the shard.
812 		 * When copying a reference table, we are probably trying to achieve just that.
813 		 *
814 		 * Since this a long-running operation we do this after the error checks, but
815 		 * before taking metadata locks.
816 		 */
817 		EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
818 	}
819 
820 	bool useLogicalReplication = false;
821 	CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
822 					targetNodeName, targetNodePort, useLogicalReplication);
823 
824 	/*
825 	 * Finally insert the placements to pg_dist_placement and sync it to the
826 	 * metadata workers.
827 	 */
828 	foreach_ptr(colocatedShard, colocatedShardList)
829 	{
830 		uint64 colocatedShardId = colocatedShard->shardId;
831 		uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
832 		uint64 placementId = GetNextPlacementId();
833 
834 		InsertShardPlacementRow(colocatedShardId, placementId,
835 								SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
836 								groupId);
837 
838 		if (ShouldSyncTableMetadata(colocatedShard->relationId))
839 		{
840 			char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId,
841 															SHARD_STATE_ACTIVE, 0,
842 															groupId);
843 
844 			SendCommandToWorkersWithMetadata(placementCommand);
845 		}
846 	}
847 }
848 
849 
850 /*
851  * EnsureTableListOwner ensures current user owns given tables. Superusers
852  * are regarded as owners.
853  */
854 static void
EnsureTableListOwner(List * tableIdList)855 EnsureTableListOwner(List *tableIdList)
856 {
857 	Oid tableId = InvalidOid;
858 	foreach_oid(tableId, tableIdList)
859 	{
860 		EnsureTableOwner(tableId);
861 	}
862 }
863 
864 
865 /*
866  * EnsureTableListSuitableForReplication errors out if given tables are not
867  * suitable for replication.
868  */
869 static void
EnsureTableListSuitableForReplication(List * tableIdList)870 EnsureTableListSuitableForReplication(List *tableIdList)
871 {
872 	Oid tableId = InvalidOid;
873 	foreach_oid(tableId, tableIdList)
874 	{
875 		char relationKind = get_rel_relkind(tableId);
876 		if (relationKind == RELKIND_FOREIGN_TABLE)
877 		{
878 			char *relationName = get_rel_name(tableId);
879 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
880 							errmsg("cannot replicate shard"),
881 							errdetail("Table %s is a foreign table. Replicating "
882 									  "shards backed by foreign tables is "
883 									  "not supported.", relationName)));
884 		}
885 
886 		List *foreignConstraintCommandList =
887 			GetReferencingForeignConstaintCommands(tableId);
888 
889 		if (foreignConstraintCommandList != NIL &&
890 			IsCitusTableType(tableId, DISTRIBUTED_TABLE))
891 		{
892 			ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
893 							errmsg("cannot replicate shards with foreign keys")));
894 		}
895 	}
896 }
897 
898 
899 /*
900  * CopyShardTables copies a shard along with its co-located shards from a source
901  * node to target node. It does not make any checks about state of the shards.
902  * It is caller's responsibility to make those checks if they are necessary.
903  */
904 static void
CopyShardTables(List * shardIntervalList,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort,bool useLogicalReplication)905 CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
906 				char *targetNodeName, int32 targetNodePort, bool useLogicalReplication)
907 {
908 	if (list_length(shardIntervalList) < 1)
909 	{
910 		return;
911 	}
912 
913 	if (useLogicalReplication)
914 	{
915 		/* only supported in Citus enterprise */
916 	}
917 	else
918 	{
919 		CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort,
920 									  targetNodeName, targetNodePort);
921 	}
922 }
923 
924 
925 /*
926  * CopyShardTablesViaBlockWrites copies a shard along with its co-located shards
927  * from a source node to target node via COPY command. While the command is in
928  * progress, the modifications on the source node is blocked.
929  */
930 static void
CopyShardTablesViaBlockWrites(List * shardIntervalList,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort)931 CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
932 							  int32 sourceNodePort, char *targetNodeName,
933 							  int32 targetNodePort)
934 {
935 	MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
936 													   "CopyShardTablesViaBlockWrites",
937 													   ALLOCSET_DEFAULT_SIZES);
938 	MemoryContext oldContext = MemoryContextSwitchTo(localContext);
939 
940 	/* iterate through the colocated shards and copy each */
941 	ShardInterval *shardInterval = NULL;
942 	foreach_ptr(shardInterval, shardIntervalList)
943 	{
944 		/*
945 		 * For each shard we first create the shard table in a separate
946 		 * transaction and then we copy the data and create the indexes in a
947 		 * second separate transaction. The reason we don't do both in a single
948 		 * transaction is so we can see the size of the new shard growing
949 		 * during the copy when we run get_rebalance_progress in another
950 		 * session. If we wouldn't split these two phases up, then the table
951 		 * wouldn't be visible in the session that get_rebalance_progress uses.
952 		 * So get_rebalance_progress would always report its size as 0.
953 		 */
954 		List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
955 														   sourceNodePort);
956 		char *tableOwner = TableOwner(shardInterval->relationId);
957 		SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
958 												  tableOwner, ddlCommandList);
959 
960 		ddlCommandList = NIL;
961 
962 		/*
963 		 * Skip copying data for partitioned tables, because they contain no
964 		 * data themselves. Their partitions do contain data, but those are
965 		 * different colocated shards that will be copied seperately.
966 		 */
967 		if (!PartitionedTable(shardInterval->relationId))
968 		{
969 			ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName,
970 														  sourceNodePort);
971 		}
972 		ddlCommandList = list_concat(
973 			ddlCommandList,
974 			PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
975 											 sourceNodePort));
976 		SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
977 												  tableOwner, ddlCommandList);
978 
979 		MemoryContextReset(localContext);
980 	}
981 
982 	/*
983 	 * Once all shards are created, we can recreate relationships between shards.
984 	 *
985 	 * Iterate through the colocated shards and create the foreign constraints and
986 	 * attach child tables to their parents in a partitioning hierarchy.
987 	 */
988 	foreach_ptr(shardInterval, shardIntervalList)
989 	{
990 		List *shardForeignConstraintCommandList = NIL;
991 		List *referenceTableForeignConstraintList = NIL;
992 		List *commandList = NIL;
993 
994 		CopyShardForeignConstraintCommandListGrouped(shardInterval,
995 													 &shardForeignConstraintCommandList,
996 													 &referenceTableForeignConstraintList);
997 
998 		commandList = list_concat(commandList, shardForeignConstraintCommandList);
999 		commandList = list_concat(commandList, referenceTableForeignConstraintList);
1000 
1001 		if (PartitionTable(shardInterval->relationId))
1002 		{
1003 			char *attachPartitionCommand =
1004 				GenerateAttachShardPartitionCommand(shardInterval);
1005 
1006 			commandList = lappend(commandList, attachPartitionCommand);
1007 		}
1008 
1009 		char *tableOwner = TableOwner(shardInterval->relationId);
1010 		SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
1011 												  tableOwner, commandList);
1012 
1013 		MemoryContextReset(localContext);
1014 	}
1015 
1016 	MemoryContextSwitchTo(oldContext);
1017 }
1018 
1019 
1020 /*
1021  * CopyPartitionShardsCommandList gets a shardInterval which is a shard that
1022  * belongs to partitioned table (this is asserted).
1023  *
1024  * The function returns a list of commands which re-creates all the partitions
1025  * of the input shardInterval.
1026  */
1027 static List *
CopyPartitionShardsCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1028 CopyPartitionShardsCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1029 							   int32 sourceNodePort)
1030 {
1031 	Oid distributedTableId = shardInterval->relationId;
1032 	List *ddlCommandList = NIL;
1033 
1034 	Assert(PartitionedTableNoLock(distributedTableId));
1035 
1036 	List *partitionList = PartitionList(distributedTableId);
1037 	Oid partitionOid = InvalidOid;
1038 	foreach_oid(partitionOid, partitionList)
1039 	{
1040 		uint64 partitionShardId =
1041 			ColocatedShardIdInRelation(partitionOid, shardInterval->shardIndex);
1042 		ShardInterval *partitionShardInterval = LoadShardInterval(partitionShardId);
1043 		bool includeData = false;
1044 
1045 		List *copyCommandList =
1046 			CopyShardCommandList(partitionShardInterval, sourceNodeName, sourceNodePort,
1047 								 includeData);
1048 		ddlCommandList = list_concat(ddlCommandList, copyCommandList);
1049 
1050 		char *attachPartitionCommand =
1051 			GenerateAttachShardPartitionCommand(partitionShardInterval);
1052 		ddlCommandList = lappend(ddlCommandList, attachPartitionCommand);
1053 	}
1054 
1055 	return ddlCommandList;
1056 }
1057 
1058 
1059 /*
1060  * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source
1061  * node and inactive node on the target node.
1062  */
1063 static void
EnsureShardCanBeRepaired(int64 shardId,const char * sourceNodeName,int32 sourceNodePort,const char * targetNodeName,int32 targetNodePort)1064 EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
1065 						 const char *targetNodeName, int32 targetNodePort)
1066 {
1067 	List *shardPlacementList =
1068 		ShardPlacementListIncludingOrphanedPlacements(shardId);
1069 
1070 	ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
1071 		shardPlacementList,
1072 		sourceNodeName,
1073 		sourceNodePort);
1074 	if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
1075 	{
1076 		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1077 						errmsg("source placement must be in active state")));
1078 	}
1079 
1080 	ShardPlacement *targetPlacement = SearchShardPlacementInListOrError(
1081 		shardPlacementList,
1082 		targetNodeName,
1083 		targetNodePort);
1084 	if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
1085 	{
1086 		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1087 						errmsg("target placement must be in inactive state")));
1088 	}
1089 }
1090 
1091 
1092 /*
1093  * EnsureShardCanBeCopied checks if the given shard has a healthy placement in the source
1094  * node and no placements in the target node.
1095  */
1096 static void
EnsureShardCanBeCopied(int64 shardId,const char * sourceNodeName,int32 sourceNodePort,const char * targetNodeName,int32 targetNodePort)1097 EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
1098 					   const char *targetNodeName, int32 targetNodePort)
1099 {
1100 	List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
1101 
1102 	ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
1103 		shardPlacementList,
1104 		sourceNodeName,
1105 		sourceNodePort);
1106 	if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
1107 	{
1108 		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1109 						errmsg("source placement must be in active state")));
1110 	}
1111 
1112 	ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
1113 																 targetNodeName,
1114 																 targetNodePort);
1115 	if (targetPlacement != NULL)
1116 	{
1117 		if (targetPlacement->shardState == SHARD_STATE_TO_DELETE)
1118 		{
1119 			/*
1120 			 * Trigger deletion of orphaned shards and hope that this removes
1121 			 * the shard.
1122 			 */
1123 			DropOrphanedShardsInSeparateTransaction();
1124 			shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
1125 			targetPlacement = SearchShardPlacementInList(shardPlacementList,
1126 														 targetNodeName,
1127 														 targetNodePort);
1128 
1129 			/*
1130 			 * If it still doesn't remove the shard, then we error.
1131 			 */
1132 			if (targetPlacement != NULL)
1133 			{
1134 				ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1135 								errmsg(
1136 									"shard " INT64_FORMAT
1137 									" still exists on the target node as an orphaned shard",
1138 									shardId),
1139 								errdetail(
1140 									"The existing shard is orphaned, but could not be deleted because there are still active queries on it")));
1141 			}
1142 		}
1143 		else
1144 		{
1145 			ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1146 							errmsg(
1147 								"shard " INT64_FORMAT " already exists in the target node",
1148 								shardId)));
1149 		}
1150 	}
1151 }
1152 
1153 
1154 /*
1155  * SearchShardPlacementInList searches a provided list for a shard placement with the
1156  * specified node name and port. This function returns NULL if no such
1157  * placement exists in the provided list.
1158  */
1159 ShardPlacement *
SearchShardPlacementInList(List * shardPlacementList,const char * nodeName,uint32 nodePort)1160 SearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
1161 						   uint32 nodePort)
1162 {
1163 	ShardPlacement *shardPlacement = NULL;
1164 	foreach_ptr(shardPlacement, shardPlacementList)
1165 	{
1166 		if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 &&
1167 			nodePort == shardPlacement->nodePort)
1168 		{
1169 			return shardPlacement;
1170 		}
1171 	}
1172 	return NULL;
1173 }
1174 
1175 
1176 /*
1177  * SearchShardPlacementInListOrError searches a provided list for a shard
1178  * placement with the specified node name and port. This function throws an
1179  * error if no such placement exists in the provided list.
1180  *
1181  * This is a separate function (instead of using missingOk), so static analysis
1182  * reasons about NULL returns correctly.
1183  */
1184 ShardPlacement *
SearchShardPlacementInListOrError(List * shardPlacementList,const char * nodeName,uint32 nodePort)1185 SearchShardPlacementInListOrError(List *shardPlacementList, const char *nodeName,
1186 								  uint32 nodePort)
1187 {
1188 	ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, nodeName,
1189 														   nodePort);
1190 	if (placement == NULL)
1191 	{
1192 		ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
1193 						errmsg("could not find placement matching \"%s:%d\"",
1194 							   nodeName, nodePort),
1195 						errhint("Confirm the placement still exists and try again.")));
1196 	}
1197 	return placement;
1198 }
1199 
1200 
1201 /*
1202  * CopyShardCommandList generates command list to copy the given shard placement
1203  * from the source node to the target node. To do this it recreates the shard
1204  * on the target, and then copies the data. Caller could optionally skip
1205  * copying the data by the flag includeDataCopy.
1206  */
1207 List *
CopyShardCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort,bool includeDataCopy)1208 CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1209 					 int32 sourceNodePort, bool includeDataCopy)
1210 {
1211 	List *copyShardToNodeCommandsList = RecreateShardDDLCommandList(
1212 		shardInterval, sourceNodeName, sourceNodePort);
1213 	if (includeDataCopy)
1214 	{
1215 		copyShardToNodeCommandsList = list_concat(
1216 			copyShardToNodeCommandsList,
1217 			CopyShardContentsCommandList(shardInterval, sourceNodeName,
1218 										 sourceNodePort));
1219 	}
1220 	return list_concat(copyShardToNodeCommandsList,
1221 					   PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
1222 														sourceNodePort));
1223 }
1224 
1225 
1226 /*
1227  * RecreateShardDDLCommandList generates a command list to recreate a shard,
1228  * but without any data init and without the post-load table creation commands.
1229  */
1230 static List *
RecreateShardDDLCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1231 RecreateShardDDLCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1232 							int32 sourceNodePort)
1233 {
1234 	int64 shardId = shardInterval->shardId;
1235 	Oid relationId = shardInterval->relationId;
1236 
1237 	List *tableRecreationCommandList = RecreateTableDDLCommandList(relationId);
1238 	return WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId);
1239 }
1240 
1241 
1242 /*
1243  * CopyShardContentsCommandList generates a command list to copy the data of the
1244  * given shard placement from the source node to the target node. This copying
1245  * requires a precreated table for the shard on the target node to have been
1246  * created already (using RecreateShardDDLCommandList).
1247  */
1248 static List *
CopyShardContentsCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1249 CopyShardContentsCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1250 							 int32 sourceNodePort)
1251 {
1252 	char *shardName = ConstructQualifiedShardName(shardInterval);
1253 	StringInfo copyShardDataCommand = makeStringInfo();
1254 	appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
1255 					 quote_literal_cstr(shardName), /* table to append */
1256 					 quote_literal_cstr(shardName), /* remote table name */
1257 					 quote_literal_cstr(sourceNodeName), /* remote host */
1258 					 sourceNodePort); /* remote port */
1259 
1260 	return list_make1(copyShardDataCommand->data);
1261 }
1262 
1263 
1264 /*
1265  * PostLoadShardCreationCommandList generates a command list to finalize the
1266  * creation of a shard after the data has been loaded. This creates stuff like
1267  * the indexes on the table.
1268  */
1269 static List *
PostLoadShardCreationCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1270 PostLoadShardCreationCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1271 								 int32 sourceNodePort)
1272 {
1273 	int64 shardId = shardInterval->shardId;
1274 	Oid relationId = shardInterval->relationId;
1275 	bool includeReplicaIdentity = true;
1276 	List *indexCommandList =
1277 		GetPostLoadTableCreationCommands(relationId, true, includeReplicaIdentity);
1278 	return WorkerApplyShardDDLCommandList(indexCommandList, shardId);
1279 }
1280 
1281 
1282 /*
1283  * CopyShardForeignConstraintCommandList generates command list to create foreign
1284  * constraints existing in source shard after copying it to the other node.
1285  */
1286 List *
CopyShardForeignConstraintCommandList(ShardInterval * shardInterval)1287 CopyShardForeignConstraintCommandList(ShardInterval *shardInterval)
1288 {
1289 	List *colocatedShardForeignConstraintCommandList = NIL;
1290 	List *referenceTableForeignConstraintList = NIL;
1291 
1292 	CopyShardForeignConstraintCommandListGrouped(shardInterval,
1293 												 &
1294 												 colocatedShardForeignConstraintCommandList,
1295 												 &referenceTableForeignConstraintList);
1296 
1297 	return list_concat(colocatedShardForeignConstraintCommandList,
1298 					   referenceTableForeignConstraintList);
1299 }
1300 
1301 
1302 /*
1303  * CopyShardForeignConstraintCommandListGrouped generates command lists
1304  * to create foreign constraints existing in source shard after copying it to other
1305  * node in separate groups for foreign constraints in between hash distributed tables
1306  * and from a hash distributed to reference tables.
1307  */
1308 void
CopyShardForeignConstraintCommandListGrouped(ShardInterval * shardInterval,List ** colocatedShardForeignConstraintCommandList,List ** referenceTableForeignConstraintList)1309 CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval,
1310 											 List **
1311 											 colocatedShardForeignConstraintCommandList,
1312 											 List **referenceTableForeignConstraintList)
1313 {
1314 	Oid relationId = shardInterval->relationId;
1315 	Oid schemaId = get_rel_namespace(relationId);
1316 	char *schemaName = get_namespace_name(schemaId);
1317 	char *escapedSchemaName = quote_literal_cstr(schemaName);
1318 	int shardIndex = 0;
1319 
1320 	List *commandList = GetReferencingForeignConstaintCommands(relationId);
1321 
1322 	/* we will only use shardIndex if there is a foreign constraint */
1323 	if (commandList != NIL)
1324 	{
1325 		shardIndex = ShardIndex(shardInterval);
1326 	}
1327 
1328 	*colocatedShardForeignConstraintCommandList = NIL;
1329 	*referenceTableForeignConstraintList = NIL;
1330 
1331 	const char *command = NULL;
1332 	foreach_ptr(command, commandList)
1333 	{
1334 		char *escapedCommand = quote_literal_cstr(command);
1335 
1336 		uint64 referencedShardId = INVALID_SHARD_ID;
1337 		bool colocatedForeignKey = false;
1338 
1339 		StringInfo applyForeignConstraintCommand = makeStringInfo();
1340 
1341 		/* we need to parse the foreign constraint command to get referenced table id */
1342 		Oid referencedRelationId = ForeignConstraintGetReferencedTableId(command);
1343 		if (referencedRelationId == InvalidOid)
1344 		{
1345 			ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1346 							errmsg("cannot create foreign key constraint"),
1347 							errdetail("Referenced relation cannot be found.")));
1348 		}
1349 
1350 		Oid referencedSchemaId = get_rel_namespace(referencedRelationId);
1351 		char *referencedSchemaName = get_namespace_name(referencedSchemaId);
1352 		char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
1353 
1354 		if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE))
1355 		{
1356 			referencedShardId = GetFirstShardId(referencedRelationId);
1357 		}
1358 		else if (IsCitusTableType(referencedRelationId, CITUS_LOCAL_TABLE))
1359 		{
1360 			/*
1361 			 * Only reference tables and citus local tables can have foreign
1362 			 * keys to citus local tables but we already do not allow copying
1363 			 * citus local table shards and we don't try to replicate citus
1364 			 * local table shards. So, the referencing table must be a reference
1365 			 * table in this context.
1366 			 */
1367 			Assert(IsCitusTableType(relationId, REFERENCE_TABLE));
1368 
1369 			/*
1370 			 * We don't set foreign keys from reference tables to citus local
1371 			 * tables in worker shard placements of reference tables because
1372 			 * we don't have the shard placement for citus local table in worker
1373 			 * nodes.
1374 			 */
1375 			continue;
1376 		}
1377 		else
1378 		{
1379 			referencedShardId = ColocatedShardIdInRelation(referencedRelationId,
1380 														   shardIndex);
1381 
1382 			colocatedForeignKey = true;
1383 		}
1384 
1385 		appendStringInfo(applyForeignConstraintCommand,
1386 						 WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardInterval->shardId,
1387 						 escapedSchemaName, referencedShardId,
1388 						 escapedReferencedSchemaName, escapedCommand);
1389 
1390 		if (colocatedForeignKey)
1391 		{
1392 			*colocatedShardForeignConstraintCommandList = lappend(
1393 				*colocatedShardForeignConstraintCommandList,
1394 				applyForeignConstraintCommand->data);
1395 		}
1396 		else
1397 		{
1398 			*referenceTableForeignConstraintList = lappend(
1399 				*referenceTableForeignConstraintList,
1400 				applyForeignConstraintCommand->data);
1401 		}
1402 	}
1403 }
1404 
1405 
1406 /*
1407  * GetFirstShardId is a helper function which returns the first
1408  * shardId of the given distributed relation. The function doesn't
1409  * sort the shardIds, so it is mostly useful for reference tables.
1410  */
1411 uint64
GetFirstShardId(Oid relationId)1412 GetFirstShardId(Oid relationId)
1413 {
1414 	List *shardList = LoadShardList(relationId);
1415 	uint64 *shardIdPointer = (uint64 *) linitial(shardList);
1416 
1417 	return (*shardIdPointer);
1418 }
1419 
1420 
1421 /*
1422  * ConstuctQualifiedShardName creates the fully qualified name string of the
1423  * given shard in <schema>.<table_name>_<shard_id> format.
1424  */
1425 char *
ConstructQualifiedShardName(ShardInterval * shardInterval)1426 ConstructQualifiedShardName(ShardInterval *shardInterval)
1427 {
1428 	Oid schemaId = get_rel_namespace(shardInterval->relationId);
1429 	char *schemaName = get_namespace_name(schemaId);
1430 	char *tableName = get_rel_name(shardInterval->relationId);
1431 
1432 	char *shardName = pstrdup(tableName);
1433 	AppendShardIdToName(&shardName, shardInterval->shardId);
1434 	shardName = quote_qualified_identifier(schemaName, shardName);
1435 
1436 	return shardName;
1437 }
1438 
1439 
1440 /*
1441  * RecreateTableDDLCommandList returns a list of DDL statements similar to that
1442  * returned by GetTableCreationCommands except that the list begins with a "DROP TABLE"
1443  * or "DROP FOREIGN TABLE" statement to facilitate idempotent recreation of a placement.
1444  */
1445 static List *
RecreateTableDDLCommandList(Oid relationId)1446 RecreateTableDDLCommandList(Oid relationId)
1447 {
1448 	const char *relationName = get_rel_name(relationId);
1449 	Oid relationSchemaId = get_rel_namespace(relationId);
1450 	const char *relationSchemaName = get_namespace_name(relationSchemaId);
1451 	const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName,
1452 																   relationName);
1453 
1454 	StringInfo dropCommand = makeStringInfo();
1455 	char relationKind = get_rel_relkind(relationId);
1456 	IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
1457 
1458 	/* build appropriate DROP command based on relation kind */
1459 	if (RegularTable(relationId))
1460 	{
1461 		appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND,
1462 						 qualifiedRelationName);
1463 	}
1464 	else if (relationKind == RELKIND_FOREIGN_TABLE)
1465 	{
1466 		appendStringInfo(dropCommand, DROP_FOREIGN_TABLE_COMMAND,
1467 						 qualifiedRelationName);
1468 	}
1469 	else
1470 	{
1471 		ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1472 						errmsg("repair target is not a regular, foreign or partitioned "
1473 							   "table")));
1474 	}
1475 
1476 	List *dropCommandList = list_make1(makeTableDDLCommandString(dropCommand->data));
1477 	List *createCommandList = GetPreLoadTableCreationCommands(relationId,
1478 															  includeSequenceDefaults,
1479 															  NULL);
1480 	List *recreateCommandList = list_concat(dropCommandList, createCommandList);
1481 
1482 	return recreateCommandList;
1483 }
1484 
1485 
1486 /*
1487  * DropColocatedShardPlacement deletes the shard placement metadata for the given shard
1488  * placement from the pg_dist_placement, and then it drops the shard table
1489  * from the given node. The function does this for all colocated placements.
1490  */
1491 static void
DropColocatedShardPlacement(ShardInterval * shardInterval,char * nodeName,int32 nodePort)1492 DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32 nodePort)
1493 {
1494 	List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
1495 	ListCell *colocatedShardCell = NULL;
1496 
1497 	foreach(colocatedShardCell, colocatedShardList)
1498 	{
1499 		ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
1500 		char *qualifiedTableName = ConstructQualifiedShardName(colocatedShard);
1501 		StringInfo dropQuery = makeStringInfo();
1502 		uint64 shardId = colocatedShard->shardId;
1503 		List *shardPlacementList =
1504 			ShardPlacementListIncludingOrphanedPlacements(shardId);
1505 		ShardPlacement *placement =
1506 			SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
1507 
1508 		appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
1509 
1510 		DeleteShardPlacementRow(placement->placementId);
1511 		SendCommandToWorker(nodeName, nodePort, dropQuery->data);
1512 	}
1513 }
1514 
1515 
1516 /*
1517  * MarkForDropColocatedShardPlacement marks the shard placement metadata for
1518  * the given shard placement to be deleted in pg_dist_placement. The function
1519  * does this for all colocated placements.
1520  */
1521 static void
MarkForDropColocatedShardPlacement(ShardInterval * shardInterval,char * nodeName,int32 nodePort)1522 MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
1523 								   nodePort)
1524 {
1525 	List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
1526 	ListCell *colocatedShardCell = NULL;
1527 
1528 	foreach(colocatedShardCell, colocatedShardList)
1529 	{
1530 		ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
1531 		uint64 shardId = colocatedShard->shardId;
1532 		List *shardPlacementList =
1533 			ShardPlacementListIncludingOrphanedPlacements(shardId);
1534 		ShardPlacement *placement =
1535 			SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
1536 
1537 		UpdateShardPlacementState(placement->placementId, SHARD_STATE_TO_DELETE);
1538 	}
1539 }
1540 
1541 
1542 /*
1543  * UpdateColocatedShardPlacementMetadataOnWorkers updates the metadata about the
1544  * placements of the given shard and its colocated shards by changing the nodename and
1545  * nodeport of the shards from the source nodename/port to target nodename/port.
1546  *
1547  * Note that the function does nothing if the given shard belongs to a non-mx table.
1548  */
1549 static void
UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort)1550 UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
1551 											   char *sourceNodeName, int32 sourceNodePort,
1552 											   char *targetNodeName, int32 targetNodePort)
1553 {
1554 	ShardInterval *shardInterval = LoadShardInterval(shardId);
1555 	ListCell *colocatedShardCell = NULL;
1556 	bool shouldSyncMetadata = ShouldSyncTableMetadata(shardInterval->relationId);
1557 
1558 	if (!shouldSyncMetadata)
1559 	{
1560 		return;
1561 	}
1562 
1563 	uint32 sourceGroupId = GroupForNode(sourceNodeName, sourceNodePort);
1564 	uint32 targetGroupId = GroupForNode(targetNodeName, targetNodePort);
1565 
1566 	List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
1567 
1568 	/* iterate through the colocated shards and copy each */
1569 	foreach(colocatedShardCell, colocatedShardList)
1570 	{
1571 		ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
1572 		StringInfo updateCommand = makeStringInfo();
1573 
1574 		appendStringInfo(updateCommand,
1575 						 "SELECT citus_internal_update_placement_metadata(%ld, %d, %d)",
1576 						 colocatedShard->shardId,
1577 						 sourceGroupId, targetGroupId);
1578 		SendCommandToWorkersWithMetadata(updateCommand->data);
1579 	}
1580 }
1581 
1582 
1583 /*
1584  * WorkerApplyShardDDLCommandList wraps all DDL commands in ddlCommandList
1585  * in a call to worker_apply_shard_ddl_command to apply the DDL command to
1586  * the shard specified by shardId.
1587  */
1588 List *
WorkerApplyShardDDLCommandList(List * ddlCommandList,int64 shardId)1589 WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId)
1590 {
1591 	List *applyDDLCommandList = NIL;
1592 
1593 	TableDDLCommand *ddlCommand = NULL;
1594 	foreach_ptr(ddlCommand, ddlCommandList)
1595 	{
1596 		Assert(CitusIsA(ddlCommand, TableDDLCommand));
1597 		char *applyDDLCommand = GetShardedTableDDLCommand(ddlCommand, shardId, NULL);
1598 		applyDDLCommandList = lappend(applyDDLCommandList, applyDDLCommand);
1599 	}
1600 
1601 	return applyDDLCommandList;
1602 }
1603