1 /*-------------------------------------------------------------------------
2 *
3 * repair_shards.c
4 *
5 * This file contains functions to repair unhealthy shard placements using data
6 * from healthy ones.
7 *
8 * Copyright (c) Citus Data, Inc.
9 *
10 *-------------------------------------------------------------------------
11 */
12
13 #include "postgres.h"
14 #include "fmgr.h"
15 #include "miscadmin.h"
16
17 #include <string.h>
18 #include <sys/statvfs.h>
19
20 #include "access/htup_details.h"
21 #include "catalog/pg_class.h"
22 #include "catalog/pg_enum.h"
23 #include "distributed/citus_ruleutils.h"
24 #include "distributed/colocation_utils.h"
25 #include "distributed/commands.h"
26 #include "distributed/connection_management.h"
27 #include "distributed/distributed_planner.h"
28 #include "distributed/listutils.h"
29 #include "distributed/shard_cleaner.h"
30 #include "distributed/coordinator_protocol.h"
31 #include "distributed/repair_shards.h"
32 #include "distributed/metadata_cache.h"
33 #include "distributed/metadata_sync.h"
34 #include "distributed/multi_join_order.h"
35 #include "distributed/multi_partitioning_utils.h"
36 #include "distributed/reference_table_utils.h"
37 #include "distributed/remote_commands.h"
38 #include "distributed/resource_lock.h"
39 #include "distributed/worker_manager.h"
40 #include "distributed/worker_protocol.h"
41 #include "distributed/worker_transaction.h"
42 #include "lib/stringinfo.h"
43 #include "nodes/pg_list.h"
44 #include "storage/lmgr.h"
45 #include "storage/lock.h"
46 #include "storage/lmgr.h"
47 #include "utils/builtins.h"
48 #include "utils/elog.h"
49 #include "utils/errcodes.h"
50 #include "utils/lsyscache.h"
51 #include "utils/palloc.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54
55 /* local function forward declarations */
56 static void ErrorIfTableCannotBeReplicated(Oid relationId);
57 static void RepairShardPlacement(int64 shardId, const char *sourceNodeName,
58 int32 sourceNodePort, const char *targetNodeName,
59 int32 targetNodePort);
60 static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
61 int32 sourceNodePort, char *targetNodeName,
62 int32 targetNodePort,
63 char shardReplicationMode);
64 static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
65 int32 sourceNodePort, char *targetNodeName,
66 int32 targetNodePort, bool useLogicalReplication);
67 static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
68 int32 sourceNodePort,
69 char *targetNodeName, int32 targetNodePort);
70 static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
71 const char *sourceNodeName,
72 int32 sourceNodePort);
73 static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName,
74 int32 sourceNodePort, const char *targetNodeName,
75 int32 targetNodePort);
76 static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
77 int32 sourceNodePort, const char *targetNodeName,
78 int32 targetNodePort);
79 static List * RecreateTableDDLCommandList(Oid relationId);
80 static void EnsureTableListOwner(List *tableIdList);
81 static void EnsureTableListSuitableForReplication(List *tableIdList);
82
83 static void DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName,
84 int32 nodePort);
85 static void MarkForDropColocatedShardPlacement(ShardInterval *shardInterval,
86 char *nodeName, int32 nodePort);
87 static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
88 char *sourceNodeName,
89 int32 sourceNodePort,
90 char *targetNodeName,
91 int32 targetNodePort);
92 static void CheckSpaceConstraints(MultiConnection *connection,
93 uint64 colocationSizeInBytes);
94 static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
95 char *sourceNodeName, uint32 sourceNodePort,
96 char *targetNodeName, uint32
97 targetNodePort);
98 static List * RecreateShardDDLCommandList(ShardInterval *shardInterval,
99 const char *sourceNodeName,
100 int32 sourceNodePort);
101 static List * CopyShardContentsCommandList(ShardInterval *shardInterval,
102 const char *sourceNodeName,
103 int32 sourceNodePort);
104 static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval,
105 const char *sourceNodeName,
106 int32 sourceNodePort);
107
108
109 /* declarations for dynamic loading */
110 PG_FUNCTION_INFO_V1(citus_copy_shard_placement);
111 PG_FUNCTION_INFO_V1(master_copy_shard_placement);
112 PG_FUNCTION_INFO_V1(citus_move_shard_placement);
113 PG_FUNCTION_INFO_V1(master_move_shard_placement);
114
115 bool DeferShardDeleteOnMove = false;
116
117 double DesiredPercentFreeAfterMove = 10;
118 bool CheckAvailableSpaceBeforeMove = true;
119
120
121 /*
122 * citus_copy_shard_placement implements a user-facing UDF to repair data from
123 * a healthy (source) node to an inactive (target) node. To accomplish this it
124 * entirely recreates the table structure before copying all data. During this
125 * time all modifications are paused to the shard. After successful repair, the
126 * inactive placement is marked healthy and modifications may continue. If the
127 * repair fails at any point, this function throws an error, leaving the node
128 * in an unhealthy state. Please note that citus_copy_shard_placement copies
129 * given shard along with its co-located shards.
130 */
131 Datum
citus_copy_shard_placement(PG_FUNCTION_ARGS)132 citus_copy_shard_placement(PG_FUNCTION_ARGS)
133 {
134 CheckCitusVersion(ERROR);
135 EnsureCoordinator();
136
137 int64 shardId = PG_GETARG_INT64(0);
138 text *sourceNodeNameText = PG_GETARG_TEXT_P(1);
139 int32 sourceNodePort = PG_GETARG_INT32(2);
140 text *targetNodeNameText = PG_GETARG_TEXT_P(3);
141 int32 targetNodePort = PG_GETARG_INT32(4);
142 bool doRepair = PG_GETARG_BOOL(5);
143 Oid shardReplicationModeOid = PG_GETARG_OID(6);
144
145 char *sourceNodeName = text_to_cstring(sourceNodeNameText);
146 char *targetNodeName = text_to_cstring(targetNodeNameText);
147
148 char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
149 if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
150 {
151 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
152 errmsg("the force_logical transfer mode is currently "
153 "unsupported")));
154 }
155
156 ShardInterval *shardInterval = LoadShardInterval(shardId);
157 ErrorIfTableCannotBeReplicated(shardInterval->relationId);
158
159 if (doRepair)
160 {
161 RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
162 targetNodePort);
163 }
164 else
165 {
166 ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
167 targetNodeName, targetNodePort,
168 shardReplicationMode);
169 }
170
171 PG_RETURN_VOID();
172 }
173
174
175 /*
176 * master_copy_shard_placement is a wrapper function for old UDF name.
177 */
178 Datum
master_copy_shard_placement(PG_FUNCTION_ARGS)179 master_copy_shard_placement(PG_FUNCTION_ARGS)
180 {
181 return citus_copy_shard_placement(fcinfo);
182 }
183
184
185 /*
186 * ShardListSizeInBytes returns the size in bytes of a set of shard tables.
187 */
188 uint64
ShardListSizeInBytes(List * shardList,char * workerNodeName,uint32 workerNodePort)189 ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32
190 workerNodePort)
191 {
192 uint32 connectionFlag = 0;
193
194 /* we skip child tables of a partitioned table if this boolean variable is true */
195 bool optimizePartitionCalculations = true;
196 StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
197 TOTAL_RELATION_SIZE,
198 optimizePartitionCalculations);
199
200 MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
201 workerNodePort);
202 PGresult *result = NULL;
203 int queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data,
204 &result);
205
206 if (queryResult != RESPONSE_OKAY)
207 {
208 ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
209 errmsg("cannot get the size because of a connection error")));
210 }
211
212 List *sizeList = ReadFirstColumnAsText(result);
213 if (list_length(sizeList) != 1)
214 {
215 ereport(ERROR, (errmsg(
216 "received wrong number of rows from worker, expected 1 received %d",
217 list_length(sizeList))));
218 }
219
220 StringInfo totalSizeStringInfo = (StringInfo) linitial(sizeList);
221 char *totalSizeString = totalSizeStringInfo->data;
222 uint64 totalSize = SafeStringToUint64(totalSizeString);
223
224 PQclear(result);
225 ForgetResults(connection);
226
227 return totalSize;
228 }
229
230
231 /*
232 * CheckSpaceConstraints checks there is enough space to place the colocation
233 * on the node that the connection is connected to.
234 */
235 static void
CheckSpaceConstraints(MultiConnection * connection,uint64 colocationSizeInBytes)236 CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes)
237 {
238 uint64 diskAvailableInBytes = 0;
239 uint64 diskSizeInBytes = 0;
240 bool success =
241 GetNodeDiskSpaceStatsForConnection(connection, &diskAvailableInBytes,
242 &diskSizeInBytes);
243 if (!success)
244 {
245 ereport(ERROR, (errmsg("Could not fetch disk stats for node: %s-%d",
246 connection->hostname, connection->port)));
247 }
248
249 uint64 diskAvailableInBytesAfterShardMove = 0;
250 if (diskAvailableInBytes < colocationSizeInBytes)
251 {
252 /*
253 * even though the space will be less than "0", we set it to 0 for convenience.
254 */
255 diskAvailableInBytes = 0;
256 }
257 else
258 {
259 diskAvailableInBytesAfterShardMove = diskAvailableInBytes - colocationSizeInBytes;
260 }
261 uint64 desiredNewDiskAvailableInBytes = diskSizeInBytes *
262 (DesiredPercentFreeAfterMove / 100);
263 if (diskAvailableInBytesAfterShardMove < desiredNewDiskAvailableInBytes)
264 {
265 ereport(ERROR, (errmsg("not enough empty space on node if the shard is moved, "
266 "actual available space after move will be %ld bytes, "
267 "desired available space after move is %ld bytes,"
268 "estimated size increase on node after move is %ld bytes.",
269 diskAvailableInBytesAfterShardMove,
270 desiredNewDiskAvailableInBytes, colocationSizeInBytes),
271 errhint(
272 "consider lowering citus.desired_percent_disk_available_after_move.")));
273 }
274 }
275
276
277 /*
278 * citus_move_shard_placement moves given shard (and its co-located shards) from one
279 * node to the other node. To accomplish this it entirely recreates the table structure
280 * before copying all data.
281 *
282 * After that, there are two different paths. First one is blocking shard move in the
283 * sense that during shard move all modifications are paused to the shard. The second
284 * one relies on logical replication meaning that the writes blocked only for a very
285 * short duration almost only when the metadata is actually being updated. This option
286 * is currently only available in Citus Enterprise.
287 *
288 * After successful move operation, shards in the source node gets deleted. If the move
289 * fails at any point, this function throws an error, leaving the cluster without doing
290 * any changes in source node or target node.
291 */
292 Datum
citus_move_shard_placement(PG_FUNCTION_ARGS)293 citus_move_shard_placement(PG_FUNCTION_ARGS)
294 {
295 CheckCitusVersion(ERROR);
296 EnsureCoordinator();
297
298 int64 shardId = PG_GETARG_INT64(0);
299 char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1));
300 int32 sourceNodePort = PG_GETARG_INT32(2);
301 char *targetNodeName = text_to_cstring(PG_GETARG_TEXT_P(3));
302 int32 targetNodePort = PG_GETARG_INT32(4);
303 Oid shardReplicationModeOid = PG_GETARG_OID(5);
304
305
306 ListCell *colocatedTableCell = NULL;
307 ListCell *colocatedShardCell = NULL;
308
309 Oid relationId = RelationIdForShard(shardId);
310 ErrorIfMoveUnsupportedTableType(relationId);
311 ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
312
313 ShardInterval *shardInterval = LoadShardInterval(shardId);
314 Oid distributedTableId = shardInterval->relationId;
315
316 List *colocatedTableList = ColocatedTableList(distributedTableId);
317 List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
318
319 foreach(colocatedTableCell, colocatedTableList)
320 {
321 Oid colocatedTableId = lfirst_oid(colocatedTableCell);
322 char relationKind = '\0';
323
324 /* check that user has owner rights in all co-located tables */
325 EnsureTableOwner(colocatedTableId);
326
327 /*
328 * Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
329 * block concurrent citus_move_shard_placement() on any shard of
330 * the same relation. This is OK for now since we're executing shard
331 * moves sequentially anyway.
332 */
333 LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
334
335 relationKind = get_rel_relkind(colocatedTableId);
336 if (relationKind == RELKIND_FOREIGN_TABLE)
337 {
338 char *relationName = get_rel_name(colocatedTableId);
339 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
340 errmsg("cannot repair shard"),
341 errdetail("Table %s is a foreign table. Repairing "
342 "shards backed by foreign tables is "
343 "not supported.", relationName)));
344 }
345 }
346
347 /* we sort colocatedShardList so that lock operations will not cause any deadlocks */
348 colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
349 foreach(colocatedShardCell, colocatedShardList)
350 {
351 ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
352 uint64 colocatedShardId = colocatedShard->shardId;
353
354 EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
355 targetNodeName, targetNodePort);
356 }
357
358 char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
359 if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
360 {
361 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
362 errmsg("the force_logical transfer mode is currently "
363 "unsupported")));
364 }
365
366 EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort,
367 targetNodeName, targetNodePort);
368
369 BlockWritesToShardList(colocatedShardList);
370
371 /*
372 * CopyColocatedShardPlacement function copies given shard with its co-located
373 * shards.
374 */
375 bool useLogicalReplication = false;
376 CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
377 targetNodePort, useLogicalReplication);
378
379 ShardInterval *colocatedShard = NULL;
380 foreach_ptr(colocatedShard, colocatedShardList)
381 {
382 uint64 colocatedShardId = colocatedShard->shardId;
383 uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
384 uint64 placementId = GetNextPlacementId();
385
386 InsertShardPlacementRow(colocatedShardId, placementId,
387 SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
388 groupId);
389 }
390
391 /* since this is move operation, we remove shards from source node after copy */
392 if (DeferShardDeleteOnMove)
393 {
394 MarkForDropColocatedShardPlacement(shardInterval, sourceNodeName, sourceNodePort);
395 }
396 else
397 {
398 DropColocatedShardPlacement(shardInterval, sourceNodeName, sourceNodePort);
399 }
400
401 UpdateColocatedShardPlacementMetadataOnWorkers(shardId, sourceNodeName,
402 sourceNodePort, targetNodeName,
403 targetNodePort);
404
405 PG_RETURN_VOID();
406 }
407
408
409 /*
410 * EnsureEnoughDiskSpaceForShardMove checks that there is enough space for
411 * shard moves of the given colocated shard list from source node to target node.
412 * It tries to clean up old shard placements to ensure there is enough space.
413 */
414 static void
EnsureEnoughDiskSpaceForShardMove(List * colocatedShardList,char * sourceNodeName,uint32 sourceNodePort,char * targetNodeName,uint32 targetNodePort)415 EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
416 char *sourceNodeName, uint32 sourceNodePort,
417 char *targetNodeName, uint32 targetNodePort)
418 {
419 if (!CheckAvailableSpaceBeforeMove)
420 {
421 return;
422 }
423 uint64 colocationSizeInBytes = ShardListSizeInBytes(colocatedShardList,
424 sourceNodeName,
425 sourceNodePort);
426
427 uint32 connectionFlag = 0;
428 MultiConnection *connection = GetNodeConnection(connectionFlag, targetNodeName,
429 targetNodePort);
430 CheckSpaceConstraints(connection, colocationSizeInBytes);
431 }
432
433
434 /*
435 * ErrorIfTargetNodeIsNotSafeToMove throws error if the target node is not
436 * eligible for moving shards.
437 */
438 void
ErrorIfTargetNodeIsNotSafeToMove(const char * targetNodeName,int targetNodePort)439 ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort)
440 {
441 WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort);
442 if (workerNode == NULL)
443 {
444 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
445 errmsg("Moving shards to a non-existing node is not supported"),
446 errhint(
447 "Add the target node via SELECT citus_add_node('%s', %d);",
448 targetNodeName, targetNodePort)));
449 }
450
451 if (!workerNode->isActive)
452 {
453 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
454 errmsg("Moving shards to a non-active node is not supported"),
455 errhint(
456 "Activate the target node via SELECT citus_activate_node('%s', %d);",
457 targetNodeName, targetNodePort)));
458 }
459
460 if (!workerNode->shouldHaveShards)
461 {
462 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
463 errmsg("Moving shards to a node that shouldn't have a shard is "
464 "not supported"),
465 errhint("Allow shards on the target node via "
466 "SELECT * FROM citus_set_node_property('%s', %d, 'shouldhaveshards', true);",
467 targetNodeName, targetNodePort)));
468 }
469
470 if (!NodeIsPrimary(workerNode))
471 {
472 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
473 errmsg("Moving shards to a secondary (e.g., replica) node is "
474 "not supported")));
475 }
476 }
477
478
479 /*
480 * master_move_shard_placement is a wrapper around citus_move_shard_placement.
481 */
482 Datum
master_move_shard_placement(PG_FUNCTION_ARGS)483 master_move_shard_placement(PG_FUNCTION_ARGS)
484 {
485 return citus_move_shard_placement(fcinfo);
486 }
487
488
489 /*
490 * ErrorIfMoveUnsupportedTableType is a helper function for rebalance_table_shards
491 * and citus_move_shard_placement udf's to error out if relation with relationId
492 * is not a distributed table.
493 */
494 void
ErrorIfMoveUnsupportedTableType(Oid relationId)495 ErrorIfMoveUnsupportedTableType(Oid relationId)
496 {
497 if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
498 {
499 return;
500 }
501
502 char *qualifiedRelationName = generate_qualified_relation_name(relationId);
503 if (!IsCitusTable(relationId))
504 {
505 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
506 errmsg("table %s is a regular postgres table, you can "
507 "only move shards of a citus table",
508 qualifiedRelationName)));
509 }
510 else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
511 {
512 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
513 errmsg("table %s is a local table, moving shard of "
514 "a local table added to metadata is currently "
515 "not supported", qualifiedRelationName)));
516 }
517 else if (IsCitusTableType(relationId, REFERENCE_TABLE))
518 {
519 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
520 errmsg("table %s is a reference table, moving shard of "
521 "a reference table is not supported",
522 qualifiedRelationName)));
523 }
524 }
525
526
527 /*
528 * BlockWritesToColocatedShardList blocks writes to all shards in the given shard
529 * list. The function assumes that all the shards in the list are colocated.
530 */
531 void
BlockWritesToShardList(List * shardList)532 BlockWritesToShardList(List *shardList)
533 {
534 ShardInterval *shard = NULL;
535 foreach_ptr(shard, shardList)
536 {
537 /*
538 * We need to lock the referenced reference table metadata to avoid
539 * asynchronous shard copy in case of cascading DML operations.
540 */
541 LockReferencedReferenceShardDistributionMetadata(shard->shardId,
542 ExclusiveLock);
543
544 LockShardDistributionMetadata(shard->shardId, ExclusiveLock);
545 }
546
547 /* following code relies on the list to have at least one shard */
548 if (list_length(shardList) == 0)
549 {
550 return;
551 }
552
553 /*
554 * Since the function assumes that the input shards are colocated,
555 * calculating shouldSyncMetadata for a single table is sufficient.
556 */
557 ShardInterval *firstShardInterval = (ShardInterval *) linitial(shardList);
558 Oid firstDistributedTableId = firstShardInterval->relationId;
559
560 bool shouldSyncMetadata = ShouldSyncTableMetadata(firstDistributedTableId);
561 if (shouldSyncMetadata)
562 {
563 LockShardListMetadataOnWorkers(ExclusiveLock, shardList);
564 }
565 }
566
567
568 /*
569 * ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable
570 * for its shard being replicated. There are 2 cases in which shard replication is not
571 * allowed:
572 *
573 * 1) MX tables, since RF=1 is a must MX tables
574 * 2) Reference tables, since the shard should already exist in all workers
575 */
576 static void
ErrorIfTableCannotBeReplicated(Oid relationId)577 ErrorIfTableCannotBeReplicated(Oid relationId)
578 {
579 /*
580 * Note that ShouldSyncTableMetadata() returns true for both MX tables
581 * and reference tables.
582 */
583 bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
584 if (!shouldSyncMetadata)
585 {
586 return;
587 }
588
589 CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
590 char *relationName = get_rel_name(relationId);
591
592 if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE))
593 {
594 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
595 (errmsg("Table %s is a local table. Replicating "
596 "shard of a local table added to metadata "
597 "currently is not supported",
598 quote_literal_cstr(relationName)))));
599 }
600
601 /*
602 * ShouldSyncTableMetadata() returns true also for reference table,
603 * we don't want to error in that case since reference tables aren't
604 * automatically replicated to active nodes with no shards, and
605 * master_copy_shard_placement() can be used to create placements in
606 * such nodes.
607 */
608 if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
609 {
610 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
611 (errmsg("Table %s is streaming replicated. Shards "
612 "of streaming replicated tables cannot "
613 "be copied", quote_literal_cstr(relationName)))));
614 }
615 }
616
617
618 /*
619 * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
620 * values to a char.
621 */
622 char
LookupShardTransferMode(Oid shardReplicationModeOid)623 LookupShardTransferMode(Oid shardReplicationModeOid)
624 {
625 char shardReplicationMode = 0;
626
627 Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardReplicationModeOid);
628 char *enumLabel = DatumGetCString(enumLabelDatum);
629
630 if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0)
631 {
632 shardReplicationMode = TRANSFER_MODE_AUTOMATIC;
633 }
634 else if (strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0)
635 {
636 shardReplicationMode = TRANSFER_MODE_FORCE_LOGICAL;
637 }
638 else if (strncmp(enumLabel, "block_writes", NAMEDATALEN) == 0)
639 {
640 shardReplicationMode = TRANSFER_MODE_BLOCK_WRITES;
641 }
642 else
643 {
644 ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
645 }
646
647 return shardReplicationMode;
648 }
649
650
651 /*
652 * RepairShardPlacement repairs given shard from a source node to target node.
653 * This function is not co-location aware. It only repairs given shard.
654 */
655 static void
RepairShardPlacement(int64 shardId,const char * sourceNodeName,int32 sourceNodePort,const char * targetNodeName,int32 targetNodePort)656 RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
657 const char *targetNodeName, int32 targetNodePort)
658 {
659 ShardInterval *shardInterval = LoadShardInterval(shardId);
660 Oid distributedTableId = shardInterval->relationId;
661
662 char relationKind = get_rel_relkind(distributedTableId);
663 char *tableOwner = TableOwner(shardInterval->relationId);
664
665 /* prevent table from being dropped */
666 LockRelationOid(distributedTableId, AccessShareLock);
667
668 EnsureTableOwner(distributedTableId);
669
670 if (relationKind == RELKIND_FOREIGN_TABLE)
671 {
672 char *relationName = get_rel_name(distributedTableId);
673 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
674 errmsg("cannot repair shard"),
675 errdetail("Table %s is a foreign table. Repairing "
676 "shards backed by foreign tables is "
677 "not supported.", relationName)));
678 }
679
680 /*
681 * Let's not allow repairing partitions to prevent any edge cases.
682 * We're already not allowing any kind of modifications on the partitions
683 * so their placements are not likely to be marked as INVALID. The only
684 * possible case to mark placement of a partition as invalid is
685 * "ALTER TABLE parent_table DETACH PARTITION partition_table". But,
686 * given that the table would become a regular distributed table if the
687 * command succeeds, we're OK since the regular distributed tables can
688 * be repaired later on.
689 */
690 EnsurePartitionTableNotReplicated(distributedTableId);
691
692 /*
693 * We take a lock on the referenced table if there is a foreign constraint
694 * during the copy procedure. If we do not block DMLs on the referenced
695 * table, we cannot avoid the inconsistency between the two copies of the
696 * data. Currently, we do not support replication factor > 1 on the tables
697 * with foreign constraints, so this command will fail for this case anyway.
698 * However, it is taken as a precaution in case we support it one day.
699 */
700 LockReferencedReferenceShardDistributionMetadata(shardId, ExclusiveLock);
701
702 /*
703 * We plan to move the placement to the healthy state, so we need to grab a shard
704 * metadata lock (in exclusive mode).
705 */
706 LockShardDistributionMetadata(shardId, ExclusiveLock);
707
708 /*
709 * For shard repair, there should be healthy placement in source node and unhealthy
710 * placement in the target node.
711 */
712 EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName,
713 targetNodePort);
714
715 /*
716 * If the shard belongs to a partitioned table, we need to load the data after
717 * creating the partitions and the partitioning hierarcy.
718 */
719 bool partitionedTable = PartitionedTableNoLock(distributedTableId);
720 bool includeData = !partitionedTable;
721
722 /* we generate necessary commands to recreate the shard in target node */
723 List *ddlCommandList =
724 CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeData);
725
726 List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList(
727 shardInterval);
728 ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList);
729
730 /*
731 * CopyShardCommandList() drops the table which cascades to partitions if the
732 * table is a partitioned table. This means that we need to create both parent
733 * table and its partitions.
734 *
735 * We also skipped copying the data, so include it here.
736 */
737 if (partitionedTable)
738 {
739 char *shardName = ConstructQualifiedShardName(shardInterval);
740 StringInfo copyShardDataCommand = makeStringInfo();
741
742 List *partitionCommandList =
743 CopyPartitionShardsCommandList(shardInterval, sourceNodeName, sourceNodePort);
744 ddlCommandList = list_concat(ddlCommandList, partitionCommandList);
745
746 /* finally copy the data as well */
747 appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
748 quote_literal_cstr(shardName), /* table to append */
749 quote_literal_cstr(shardName), /* remote table name */
750 quote_literal_cstr(sourceNodeName), /* remote host */
751 sourceNodePort); /* remote port */
752 ddlCommandList = lappend(ddlCommandList, copyShardDataCommand->data);
753 }
754
755 EnsureNoModificationsHaveBeenDone();
756 SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner,
757 ddlCommandList);
758
759 /* after successful repair, we update shard state as healthy*/
760 List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
761 ShardPlacement *placement = SearchShardPlacementInListOrError(placementList,
762 targetNodeName,
763 targetNodePort);
764 UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE);
765 }
766
767
768 /*
769 * ReplicateColocatedShardPlacement replicates the given shard and its
770 * colocated shards from a source node to target node.
771 */
772 static void
ReplicateColocatedShardPlacement(int64 shardId,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort,char shardReplicationMode)773 ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
774 int32 sourceNodePort, char *targetNodeName,
775 int32 targetNodePort, char shardReplicationMode)
776 {
777 ShardInterval *shardInterval = LoadShardInterval(shardId);
778 Oid distributedTableId = shardInterval->relationId;
779
780 List *colocatedTableList = ColocatedTableList(distributedTableId);
781 List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
782
783 EnsureTableListOwner(colocatedTableList);
784 EnsureTableListSuitableForReplication(colocatedTableList);
785
786 /*
787 * We sort shardIntervalList so that lock operations will not cause any
788 * deadlocks.
789 */
790 colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
791
792 BlockWritesToShardList(colocatedShardList);
793
794 ShardInterval *colocatedShard = NULL;
795 foreach_ptr(colocatedShard, colocatedShardList)
796 {
797 uint64 colocatedShardId = colocatedShard->shardId;
798
799 /*
800 * For shard copy, there should be healthy placement in source node and no
801 * placement in the target node.
802 */
803 EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
804 targetNodeName, targetNodePort);
805 }
806
807 if (!IsCitusTableType(distributedTableId, REFERENCE_TABLE))
808 {
809 /*
810 * When copying a shard to a new node, we should first ensure that reference
811 * tables are present such that joins work immediately after copying the shard.
812 * When copying a reference table, we are probably trying to achieve just that.
813 *
814 * Since this a long-running operation we do this after the error checks, but
815 * before taking metadata locks.
816 */
817 EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
818 }
819
820 bool useLogicalReplication = false;
821 CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
822 targetNodeName, targetNodePort, useLogicalReplication);
823
824 /*
825 * Finally insert the placements to pg_dist_placement and sync it to the
826 * metadata workers.
827 */
828 foreach_ptr(colocatedShard, colocatedShardList)
829 {
830 uint64 colocatedShardId = colocatedShard->shardId;
831 uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
832 uint64 placementId = GetNextPlacementId();
833
834 InsertShardPlacementRow(colocatedShardId, placementId,
835 SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
836 groupId);
837
838 if (ShouldSyncTableMetadata(colocatedShard->relationId))
839 {
840 char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId,
841 SHARD_STATE_ACTIVE, 0,
842 groupId);
843
844 SendCommandToWorkersWithMetadata(placementCommand);
845 }
846 }
847 }
848
849
850 /*
851 * EnsureTableListOwner ensures current user owns given tables. Superusers
852 * are regarded as owners.
853 */
854 static void
EnsureTableListOwner(List * tableIdList)855 EnsureTableListOwner(List *tableIdList)
856 {
857 Oid tableId = InvalidOid;
858 foreach_oid(tableId, tableIdList)
859 {
860 EnsureTableOwner(tableId);
861 }
862 }
863
864
865 /*
866 * EnsureTableListSuitableForReplication errors out if given tables are not
867 * suitable for replication.
868 */
869 static void
EnsureTableListSuitableForReplication(List * tableIdList)870 EnsureTableListSuitableForReplication(List *tableIdList)
871 {
872 Oid tableId = InvalidOid;
873 foreach_oid(tableId, tableIdList)
874 {
875 char relationKind = get_rel_relkind(tableId);
876 if (relationKind == RELKIND_FOREIGN_TABLE)
877 {
878 char *relationName = get_rel_name(tableId);
879 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
880 errmsg("cannot replicate shard"),
881 errdetail("Table %s is a foreign table. Replicating "
882 "shards backed by foreign tables is "
883 "not supported.", relationName)));
884 }
885
886 List *foreignConstraintCommandList =
887 GetReferencingForeignConstaintCommands(tableId);
888
889 if (foreignConstraintCommandList != NIL &&
890 IsCitusTableType(tableId, DISTRIBUTED_TABLE))
891 {
892 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
893 errmsg("cannot replicate shards with foreign keys")));
894 }
895 }
896 }
897
898
899 /*
900 * CopyShardTables copies a shard along with its co-located shards from a source
901 * node to target node. It does not make any checks about state of the shards.
902 * It is caller's responsibility to make those checks if they are necessary.
903 */
904 static void
CopyShardTables(List * shardIntervalList,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort,bool useLogicalReplication)905 CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
906 char *targetNodeName, int32 targetNodePort, bool useLogicalReplication)
907 {
908 if (list_length(shardIntervalList) < 1)
909 {
910 return;
911 }
912
913 if (useLogicalReplication)
914 {
915 /* only supported in Citus enterprise */
916 }
917 else
918 {
919 CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort,
920 targetNodeName, targetNodePort);
921 }
922 }
923
924
925 /*
926 * CopyShardTablesViaBlockWrites copies a shard along with its co-located shards
927 * from a source node to target node via COPY command. While the command is in
928 * progress, the modifications on the source node is blocked.
929 */
930 static void
CopyShardTablesViaBlockWrites(List * shardIntervalList,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort)931 CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
932 int32 sourceNodePort, char *targetNodeName,
933 int32 targetNodePort)
934 {
935 MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
936 "CopyShardTablesViaBlockWrites",
937 ALLOCSET_DEFAULT_SIZES);
938 MemoryContext oldContext = MemoryContextSwitchTo(localContext);
939
940 /* iterate through the colocated shards and copy each */
941 ShardInterval *shardInterval = NULL;
942 foreach_ptr(shardInterval, shardIntervalList)
943 {
944 /*
945 * For each shard we first create the shard table in a separate
946 * transaction and then we copy the data and create the indexes in a
947 * second separate transaction. The reason we don't do both in a single
948 * transaction is so we can see the size of the new shard growing
949 * during the copy when we run get_rebalance_progress in another
950 * session. If we wouldn't split these two phases up, then the table
951 * wouldn't be visible in the session that get_rebalance_progress uses.
952 * So get_rebalance_progress would always report its size as 0.
953 */
954 List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
955 sourceNodePort);
956 char *tableOwner = TableOwner(shardInterval->relationId);
957 SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
958 tableOwner, ddlCommandList);
959
960 ddlCommandList = NIL;
961
962 /*
963 * Skip copying data for partitioned tables, because they contain no
964 * data themselves. Their partitions do contain data, but those are
965 * different colocated shards that will be copied seperately.
966 */
967 if (!PartitionedTable(shardInterval->relationId))
968 {
969 ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName,
970 sourceNodePort);
971 }
972 ddlCommandList = list_concat(
973 ddlCommandList,
974 PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
975 sourceNodePort));
976 SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
977 tableOwner, ddlCommandList);
978
979 MemoryContextReset(localContext);
980 }
981
982 /*
983 * Once all shards are created, we can recreate relationships between shards.
984 *
985 * Iterate through the colocated shards and create the foreign constraints and
986 * attach child tables to their parents in a partitioning hierarchy.
987 */
988 foreach_ptr(shardInterval, shardIntervalList)
989 {
990 List *shardForeignConstraintCommandList = NIL;
991 List *referenceTableForeignConstraintList = NIL;
992 List *commandList = NIL;
993
994 CopyShardForeignConstraintCommandListGrouped(shardInterval,
995 &shardForeignConstraintCommandList,
996 &referenceTableForeignConstraintList);
997
998 commandList = list_concat(commandList, shardForeignConstraintCommandList);
999 commandList = list_concat(commandList, referenceTableForeignConstraintList);
1000
1001 if (PartitionTable(shardInterval->relationId))
1002 {
1003 char *attachPartitionCommand =
1004 GenerateAttachShardPartitionCommand(shardInterval);
1005
1006 commandList = lappend(commandList, attachPartitionCommand);
1007 }
1008
1009 char *tableOwner = TableOwner(shardInterval->relationId);
1010 SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
1011 tableOwner, commandList);
1012
1013 MemoryContextReset(localContext);
1014 }
1015
1016 MemoryContextSwitchTo(oldContext);
1017 }
1018
1019
1020 /*
1021 * CopyPartitionShardsCommandList gets a shardInterval which is a shard that
1022 * belongs to partitioned table (this is asserted).
1023 *
1024 * The function returns a list of commands which re-creates all the partitions
1025 * of the input shardInterval.
1026 */
1027 static List *
CopyPartitionShardsCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1028 CopyPartitionShardsCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1029 int32 sourceNodePort)
1030 {
1031 Oid distributedTableId = shardInterval->relationId;
1032 List *ddlCommandList = NIL;
1033
1034 Assert(PartitionedTableNoLock(distributedTableId));
1035
1036 List *partitionList = PartitionList(distributedTableId);
1037 Oid partitionOid = InvalidOid;
1038 foreach_oid(partitionOid, partitionList)
1039 {
1040 uint64 partitionShardId =
1041 ColocatedShardIdInRelation(partitionOid, shardInterval->shardIndex);
1042 ShardInterval *partitionShardInterval = LoadShardInterval(partitionShardId);
1043 bool includeData = false;
1044
1045 List *copyCommandList =
1046 CopyShardCommandList(partitionShardInterval, sourceNodeName, sourceNodePort,
1047 includeData);
1048 ddlCommandList = list_concat(ddlCommandList, copyCommandList);
1049
1050 char *attachPartitionCommand =
1051 GenerateAttachShardPartitionCommand(partitionShardInterval);
1052 ddlCommandList = lappend(ddlCommandList, attachPartitionCommand);
1053 }
1054
1055 return ddlCommandList;
1056 }
1057
1058
1059 /*
1060 * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source
1061 * node and inactive node on the target node.
1062 */
1063 static void
EnsureShardCanBeRepaired(int64 shardId,const char * sourceNodeName,int32 sourceNodePort,const char * targetNodeName,int32 targetNodePort)1064 EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
1065 const char *targetNodeName, int32 targetNodePort)
1066 {
1067 List *shardPlacementList =
1068 ShardPlacementListIncludingOrphanedPlacements(shardId);
1069
1070 ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
1071 shardPlacementList,
1072 sourceNodeName,
1073 sourceNodePort);
1074 if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
1075 {
1076 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1077 errmsg("source placement must be in active state")));
1078 }
1079
1080 ShardPlacement *targetPlacement = SearchShardPlacementInListOrError(
1081 shardPlacementList,
1082 targetNodeName,
1083 targetNodePort);
1084 if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
1085 {
1086 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1087 errmsg("target placement must be in inactive state")));
1088 }
1089 }
1090
1091
1092 /*
1093 * EnsureShardCanBeCopied checks if the given shard has a healthy placement in the source
1094 * node and no placements in the target node.
1095 */
1096 static void
EnsureShardCanBeCopied(int64 shardId,const char * sourceNodeName,int32 sourceNodePort,const char * targetNodeName,int32 targetNodePort)1097 EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
1098 const char *targetNodeName, int32 targetNodePort)
1099 {
1100 List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
1101
1102 ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
1103 shardPlacementList,
1104 sourceNodeName,
1105 sourceNodePort);
1106 if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
1107 {
1108 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1109 errmsg("source placement must be in active state")));
1110 }
1111
1112 ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
1113 targetNodeName,
1114 targetNodePort);
1115 if (targetPlacement != NULL)
1116 {
1117 if (targetPlacement->shardState == SHARD_STATE_TO_DELETE)
1118 {
1119 /*
1120 * Trigger deletion of orphaned shards and hope that this removes
1121 * the shard.
1122 */
1123 DropOrphanedShardsInSeparateTransaction();
1124 shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
1125 targetPlacement = SearchShardPlacementInList(shardPlacementList,
1126 targetNodeName,
1127 targetNodePort);
1128
1129 /*
1130 * If it still doesn't remove the shard, then we error.
1131 */
1132 if (targetPlacement != NULL)
1133 {
1134 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1135 errmsg(
1136 "shard " INT64_FORMAT
1137 " still exists on the target node as an orphaned shard",
1138 shardId),
1139 errdetail(
1140 "The existing shard is orphaned, but could not be deleted because there are still active queries on it")));
1141 }
1142 }
1143 else
1144 {
1145 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1146 errmsg(
1147 "shard " INT64_FORMAT " already exists in the target node",
1148 shardId)));
1149 }
1150 }
1151 }
1152
1153
1154 /*
1155 * SearchShardPlacementInList searches a provided list for a shard placement with the
1156 * specified node name and port. This function returns NULL if no such
1157 * placement exists in the provided list.
1158 */
1159 ShardPlacement *
SearchShardPlacementInList(List * shardPlacementList,const char * nodeName,uint32 nodePort)1160 SearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
1161 uint32 nodePort)
1162 {
1163 ShardPlacement *shardPlacement = NULL;
1164 foreach_ptr(shardPlacement, shardPlacementList)
1165 {
1166 if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 &&
1167 nodePort == shardPlacement->nodePort)
1168 {
1169 return shardPlacement;
1170 }
1171 }
1172 return NULL;
1173 }
1174
1175
1176 /*
1177 * SearchShardPlacementInListOrError searches a provided list for a shard
1178 * placement with the specified node name and port. This function throws an
1179 * error if no such placement exists in the provided list.
1180 *
1181 * This is a separate function (instead of using missingOk), so static analysis
1182 * reasons about NULL returns correctly.
1183 */
1184 ShardPlacement *
SearchShardPlacementInListOrError(List * shardPlacementList,const char * nodeName,uint32 nodePort)1185 SearchShardPlacementInListOrError(List *shardPlacementList, const char *nodeName,
1186 uint32 nodePort)
1187 {
1188 ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, nodeName,
1189 nodePort);
1190 if (placement == NULL)
1191 {
1192 ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
1193 errmsg("could not find placement matching \"%s:%d\"",
1194 nodeName, nodePort),
1195 errhint("Confirm the placement still exists and try again.")));
1196 }
1197 return placement;
1198 }
1199
1200
1201 /*
1202 * CopyShardCommandList generates command list to copy the given shard placement
1203 * from the source node to the target node. To do this it recreates the shard
1204 * on the target, and then copies the data. Caller could optionally skip
1205 * copying the data by the flag includeDataCopy.
1206 */
1207 List *
CopyShardCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort,bool includeDataCopy)1208 CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1209 int32 sourceNodePort, bool includeDataCopy)
1210 {
1211 List *copyShardToNodeCommandsList = RecreateShardDDLCommandList(
1212 shardInterval, sourceNodeName, sourceNodePort);
1213 if (includeDataCopy)
1214 {
1215 copyShardToNodeCommandsList = list_concat(
1216 copyShardToNodeCommandsList,
1217 CopyShardContentsCommandList(shardInterval, sourceNodeName,
1218 sourceNodePort));
1219 }
1220 return list_concat(copyShardToNodeCommandsList,
1221 PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
1222 sourceNodePort));
1223 }
1224
1225
1226 /*
1227 * RecreateShardDDLCommandList generates a command list to recreate a shard,
1228 * but without any data init and without the post-load table creation commands.
1229 */
1230 static List *
RecreateShardDDLCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1231 RecreateShardDDLCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1232 int32 sourceNodePort)
1233 {
1234 int64 shardId = shardInterval->shardId;
1235 Oid relationId = shardInterval->relationId;
1236
1237 List *tableRecreationCommandList = RecreateTableDDLCommandList(relationId);
1238 return WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId);
1239 }
1240
1241
1242 /*
1243 * CopyShardContentsCommandList generates a command list to copy the data of the
1244 * given shard placement from the source node to the target node. This copying
1245 * requires a precreated table for the shard on the target node to have been
1246 * created already (using RecreateShardDDLCommandList).
1247 */
1248 static List *
CopyShardContentsCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1249 CopyShardContentsCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1250 int32 sourceNodePort)
1251 {
1252 char *shardName = ConstructQualifiedShardName(shardInterval);
1253 StringInfo copyShardDataCommand = makeStringInfo();
1254 appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
1255 quote_literal_cstr(shardName), /* table to append */
1256 quote_literal_cstr(shardName), /* remote table name */
1257 quote_literal_cstr(sourceNodeName), /* remote host */
1258 sourceNodePort); /* remote port */
1259
1260 return list_make1(copyShardDataCommand->data);
1261 }
1262
1263
1264 /*
1265 * PostLoadShardCreationCommandList generates a command list to finalize the
1266 * creation of a shard after the data has been loaded. This creates stuff like
1267 * the indexes on the table.
1268 */
1269 static List *
PostLoadShardCreationCommandList(ShardInterval * shardInterval,const char * sourceNodeName,int32 sourceNodePort)1270 PostLoadShardCreationCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
1271 int32 sourceNodePort)
1272 {
1273 int64 shardId = shardInterval->shardId;
1274 Oid relationId = shardInterval->relationId;
1275 bool includeReplicaIdentity = true;
1276 List *indexCommandList =
1277 GetPostLoadTableCreationCommands(relationId, true, includeReplicaIdentity);
1278 return WorkerApplyShardDDLCommandList(indexCommandList, shardId);
1279 }
1280
1281
1282 /*
1283 * CopyShardForeignConstraintCommandList generates command list to create foreign
1284 * constraints existing in source shard after copying it to the other node.
1285 */
1286 List *
CopyShardForeignConstraintCommandList(ShardInterval * shardInterval)1287 CopyShardForeignConstraintCommandList(ShardInterval *shardInterval)
1288 {
1289 List *colocatedShardForeignConstraintCommandList = NIL;
1290 List *referenceTableForeignConstraintList = NIL;
1291
1292 CopyShardForeignConstraintCommandListGrouped(shardInterval,
1293 &
1294 colocatedShardForeignConstraintCommandList,
1295 &referenceTableForeignConstraintList);
1296
1297 return list_concat(colocatedShardForeignConstraintCommandList,
1298 referenceTableForeignConstraintList);
1299 }
1300
1301
1302 /*
1303 * CopyShardForeignConstraintCommandListGrouped generates command lists
1304 * to create foreign constraints existing in source shard after copying it to other
1305 * node in separate groups for foreign constraints in between hash distributed tables
1306 * and from a hash distributed to reference tables.
1307 */
1308 void
CopyShardForeignConstraintCommandListGrouped(ShardInterval * shardInterval,List ** colocatedShardForeignConstraintCommandList,List ** referenceTableForeignConstraintList)1309 CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval,
1310 List **
1311 colocatedShardForeignConstraintCommandList,
1312 List **referenceTableForeignConstraintList)
1313 {
1314 Oid relationId = shardInterval->relationId;
1315 Oid schemaId = get_rel_namespace(relationId);
1316 char *schemaName = get_namespace_name(schemaId);
1317 char *escapedSchemaName = quote_literal_cstr(schemaName);
1318 int shardIndex = 0;
1319
1320 List *commandList = GetReferencingForeignConstaintCommands(relationId);
1321
1322 /* we will only use shardIndex if there is a foreign constraint */
1323 if (commandList != NIL)
1324 {
1325 shardIndex = ShardIndex(shardInterval);
1326 }
1327
1328 *colocatedShardForeignConstraintCommandList = NIL;
1329 *referenceTableForeignConstraintList = NIL;
1330
1331 const char *command = NULL;
1332 foreach_ptr(command, commandList)
1333 {
1334 char *escapedCommand = quote_literal_cstr(command);
1335
1336 uint64 referencedShardId = INVALID_SHARD_ID;
1337 bool colocatedForeignKey = false;
1338
1339 StringInfo applyForeignConstraintCommand = makeStringInfo();
1340
1341 /* we need to parse the foreign constraint command to get referenced table id */
1342 Oid referencedRelationId = ForeignConstraintGetReferencedTableId(command);
1343 if (referencedRelationId == InvalidOid)
1344 {
1345 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1346 errmsg("cannot create foreign key constraint"),
1347 errdetail("Referenced relation cannot be found.")));
1348 }
1349
1350 Oid referencedSchemaId = get_rel_namespace(referencedRelationId);
1351 char *referencedSchemaName = get_namespace_name(referencedSchemaId);
1352 char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
1353
1354 if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE))
1355 {
1356 referencedShardId = GetFirstShardId(referencedRelationId);
1357 }
1358 else if (IsCitusTableType(referencedRelationId, CITUS_LOCAL_TABLE))
1359 {
1360 /*
1361 * Only reference tables and citus local tables can have foreign
1362 * keys to citus local tables but we already do not allow copying
1363 * citus local table shards and we don't try to replicate citus
1364 * local table shards. So, the referencing table must be a reference
1365 * table in this context.
1366 */
1367 Assert(IsCitusTableType(relationId, REFERENCE_TABLE));
1368
1369 /*
1370 * We don't set foreign keys from reference tables to citus local
1371 * tables in worker shard placements of reference tables because
1372 * we don't have the shard placement for citus local table in worker
1373 * nodes.
1374 */
1375 continue;
1376 }
1377 else
1378 {
1379 referencedShardId = ColocatedShardIdInRelation(referencedRelationId,
1380 shardIndex);
1381
1382 colocatedForeignKey = true;
1383 }
1384
1385 appendStringInfo(applyForeignConstraintCommand,
1386 WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardInterval->shardId,
1387 escapedSchemaName, referencedShardId,
1388 escapedReferencedSchemaName, escapedCommand);
1389
1390 if (colocatedForeignKey)
1391 {
1392 *colocatedShardForeignConstraintCommandList = lappend(
1393 *colocatedShardForeignConstraintCommandList,
1394 applyForeignConstraintCommand->data);
1395 }
1396 else
1397 {
1398 *referenceTableForeignConstraintList = lappend(
1399 *referenceTableForeignConstraintList,
1400 applyForeignConstraintCommand->data);
1401 }
1402 }
1403 }
1404
1405
1406 /*
1407 * GetFirstShardId is a helper function which returns the first
1408 * shardId of the given distributed relation. The function doesn't
1409 * sort the shardIds, so it is mostly useful for reference tables.
1410 */
1411 uint64
GetFirstShardId(Oid relationId)1412 GetFirstShardId(Oid relationId)
1413 {
1414 List *shardList = LoadShardList(relationId);
1415 uint64 *shardIdPointer = (uint64 *) linitial(shardList);
1416
1417 return (*shardIdPointer);
1418 }
1419
1420
1421 /*
1422 * ConstuctQualifiedShardName creates the fully qualified name string of the
1423 * given shard in <schema>.<table_name>_<shard_id> format.
1424 */
1425 char *
ConstructQualifiedShardName(ShardInterval * shardInterval)1426 ConstructQualifiedShardName(ShardInterval *shardInterval)
1427 {
1428 Oid schemaId = get_rel_namespace(shardInterval->relationId);
1429 char *schemaName = get_namespace_name(schemaId);
1430 char *tableName = get_rel_name(shardInterval->relationId);
1431
1432 char *shardName = pstrdup(tableName);
1433 AppendShardIdToName(&shardName, shardInterval->shardId);
1434 shardName = quote_qualified_identifier(schemaName, shardName);
1435
1436 return shardName;
1437 }
1438
1439
1440 /*
1441 * RecreateTableDDLCommandList returns a list of DDL statements similar to that
1442 * returned by GetTableCreationCommands except that the list begins with a "DROP TABLE"
1443 * or "DROP FOREIGN TABLE" statement to facilitate idempotent recreation of a placement.
1444 */
1445 static List *
RecreateTableDDLCommandList(Oid relationId)1446 RecreateTableDDLCommandList(Oid relationId)
1447 {
1448 const char *relationName = get_rel_name(relationId);
1449 Oid relationSchemaId = get_rel_namespace(relationId);
1450 const char *relationSchemaName = get_namespace_name(relationSchemaId);
1451 const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName,
1452 relationName);
1453
1454 StringInfo dropCommand = makeStringInfo();
1455 char relationKind = get_rel_relkind(relationId);
1456 IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
1457
1458 /* build appropriate DROP command based on relation kind */
1459 if (RegularTable(relationId))
1460 {
1461 appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND,
1462 qualifiedRelationName);
1463 }
1464 else if (relationKind == RELKIND_FOREIGN_TABLE)
1465 {
1466 appendStringInfo(dropCommand, DROP_FOREIGN_TABLE_COMMAND,
1467 qualifiedRelationName);
1468 }
1469 else
1470 {
1471 ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1472 errmsg("repair target is not a regular, foreign or partitioned "
1473 "table")));
1474 }
1475
1476 List *dropCommandList = list_make1(makeTableDDLCommandString(dropCommand->data));
1477 List *createCommandList = GetPreLoadTableCreationCommands(relationId,
1478 includeSequenceDefaults,
1479 NULL);
1480 List *recreateCommandList = list_concat(dropCommandList, createCommandList);
1481
1482 return recreateCommandList;
1483 }
1484
1485
1486 /*
1487 * DropColocatedShardPlacement deletes the shard placement metadata for the given shard
1488 * placement from the pg_dist_placement, and then it drops the shard table
1489 * from the given node. The function does this for all colocated placements.
1490 */
1491 static void
DropColocatedShardPlacement(ShardInterval * shardInterval,char * nodeName,int32 nodePort)1492 DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32 nodePort)
1493 {
1494 List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
1495 ListCell *colocatedShardCell = NULL;
1496
1497 foreach(colocatedShardCell, colocatedShardList)
1498 {
1499 ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
1500 char *qualifiedTableName = ConstructQualifiedShardName(colocatedShard);
1501 StringInfo dropQuery = makeStringInfo();
1502 uint64 shardId = colocatedShard->shardId;
1503 List *shardPlacementList =
1504 ShardPlacementListIncludingOrphanedPlacements(shardId);
1505 ShardPlacement *placement =
1506 SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
1507
1508 appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
1509
1510 DeleteShardPlacementRow(placement->placementId);
1511 SendCommandToWorker(nodeName, nodePort, dropQuery->data);
1512 }
1513 }
1514
1515
1516 /*
1517 * MarkForDropColocatedShardPlacement marks the shard placement metadata for
1518 * the given shard placement to be deleted in pg_dist_placement. The function
1519 * does this for all colocated placements.
1520 */
1521 static void
MarkForDropColocatedShardPlacement(ShardInterval * shardInterval,char * nodeName,int32 nodePort)1522 MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
1523 nodePort)
1524 {
1525 List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
1526 ListCell *colocatedShardCell = NULL;
1527
1528 foreach(colocatedShardCell, colocatedShardList)
1529 {
1530 ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
1531 uint64 shardId = colocatedShard->shardId;
1532 List *shardPlacementList =
1533 ShardPlacementListIncludingOrphanedPlacements(shardId);
1534 ShardPlacement *placement =
1535 SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
1536
1537 UpdateShardPlacementState(placement->placementId, SHARD_STATE_TO_DELETE);
1538 }
1539 }
1540
1541
1542 /*
1543 * UpdateColocatedShardPlacementMetadataOnWorkers updates the metadata about the
1544 * placements of the given shard and its colocated shards by changing the nodename and
1545 * nodeport of the shards from the source nodename/port to target nodename/port.
1546 *
1547 * Note that the function does nothing if the given shard belongs to a non-mx table.
1548 */
1549 static void
UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,char * sourceNodeName,int32 sourceNodePort,char * targetNodeName,int32 targetNodePort)1550 UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
1551 char *sourceNodeName, int32 sourceNodePort,
1552 char *targetNodeName, int32 targetNodePort)
1553 {
1554 ShardInterval *shardInterval = LoadShardInterval(shardId);
1555 ListCell *colocatedShardCell = NULL;
1556 bool shouldSyncMetadata = ShouldSyncTableMetadata(shardInterval->relationId);
1557
1558 if (!shouldSyncMetadata)
1559 {
1560 return;
1561 }
1562
1563 uint32 sourceGroupId = GroupForNode(sourceNodeName, sourceNodePort);
1564 uint32 targetGroupId = GroupForNode(targetNodeName, targetNodePort);
1565
1566 List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
1567
1568 /* iterate through the colocated shards and copy each */
1569 foreach(colocatedShardCell, colocatedShardList)
1570 {
1571 ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
1572 StringInfo updateCommand = makeStringInfo();
1573
1574 appendStringInfo(updateCommand,
1575 "SELECT citus_internal_update_placement_metadata(%ld, %d, %d)",
1576 colocatedShard->shardId,
1577 sourceGroupId, targetGroupId);
1578 SendCommandToWorkersWithMetadata(updateCommand->data);
1579 }
1580 }
1581
1582
1583 /*
1584 * WorkerApplyShardDDLCommandList wraps all DDL commands in ddlCommandList
1585 * in a call to worker_apply_shard_ddl_command to apply the DDL command to
1586 * the shard specified by shardId.
1587 */
1588 List *
WorkerApplyShardDDLCommandList(List * ddlCommandList,int64 shardId)1589 WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId)
1590 {
1591 List *applyDDLCommandList = NIL;
1592
1593 TableDDLCommand *ddlCommand = NULL;
1594 foreach_ptr(ddlCommand, ddlCommandList)
1595 {
1596 Assert(CitusIsA(ddlCommand, TableDDLCommand));
1597 char *applyDDLCommand = GetShardedTableDDLCommand(ddlCommand, shardId, NULL);
1598 applyDDLCommandList = lappend(applyDDLCommandList, applyDDLCommand);
1599 }
1600
1601 return applyDDLCommandList;
1602 }
1603