1 /*-------------------------------------------------------------------------
2 *
3 * reference_table_utils.c
4 *
5 * Declarations for public utility functions related to reference tables.
6 *
7 * Copyright (c) Citus Data, Inc.
8 *
9 *-------------------------------------------------------------------------
10 */
11
12 #include "postgres.h"
13 #include "miscadmin.h"
14
15 #include "access/heapam.h"
16 #include "access/htup_details.h"
17 #include "access/genam.h"
18 #include "distributed/colocation_utils.h"
19 #include "distributed/commands.h"
20 #include "distributed/listutils.h"
21 #include "distributed/coordinator_protocol.h"
22 #include "distributed/metadata_utility.h"
23 #include "distributed/metadata_cache.h"
24 #include "distributed/metadata_sync.h"
25 #include "distributed/multi_executor.h"
26 #include "distributed/multi_logical_planner.h"
27 #include "distributed/reference_table_utils.h"
28 #include "distributed/relation_access_tracking.h"
29 #include "distributed/remote_commands.h"
30 #include "distributed/resource_lock.h"
31 #include "distributed/shardinterval_utils.h"
32 #include "distributed/transaction_management.h"
33 #include "distributed/worker_manager.h"
34 #include "distributed/worker_transaction.h"
35 #include "postmaster/postmaster.h"
36 #include "storage/lmgr.h"
37 #include "utils/builtins.h"
38 #include "utils/fmgroids.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
41
42 /* local function forward declarations */
43 static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode);
44 static StringInfo CopyShardPlacementToWorkerNodeQuery(
45 ShardPlacement *sourceShardPlacement,
46 WorkerNode *workerNode,
47 char transferMode);
48 static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
49 int nodePort);
50 static bool AnyRelationsModifiedInTransaction(List *relationIdList);
51
52 /* exports for SQL callable functions */
53 PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
54 PG_FUNCTION_INFO_V1(replicate_reference_tables);
55
56 /*
57 * replicate_reference_tables is a UDF to ensure that allreference tables are
58 * replicated to all nodes.
59 */
60 Datum
replicate_reference_tables(PG_FUNCTION_ARGS)61 replicate_reference_tables(PG_FUNCTION_ARGS)
62 {
63 EnsureReferenceTablesExistOnAllNodes();
64
65 PG_RETURN_VOID();
66 }
67
68
69 /*
70 * EnsureReferenceTablesExistOnAllNodes ensures that a shard placement for every
71 * reference table exists on all nodes. If a node does not have a set of shard
72 * placements, then master_copy_shard_placement is called in a subtransaction
73 * to pull the data to the new node.
74 */
75 void
EnsureReferenceTablesExistOnAllNodes(void)76 EnsureReferenceTablesExistOnAllNodes(void)
77 {
78 EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
79 }
80
81
82 /*
83 * EnsureReferenceTablesExistOnAllNodesExtended ensures that a shard placement for every
84 * reference table exists on all nodes. If a node does not have a set of shard placements,
85 * then master_copy_shard_placement is called in a subtransaction to pull the data to the
86 * new node.
87 *
88 * The transferMode is passed on to the implementation of the copy to control the locks
89 * and transferMode.
90 */
91 void
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)92 EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
93 {
94 /*
95 * Prevent this function from running concurrently with itself.
96 *
97 * It also prevents concurrent DROP TABLE or DROP SCHEMA. We need this
98 * because through-out this function we assume values in referenceTableIdList
99 * are still valid.
100 *
101 * We don't need to handle other kinds of reference table DML/DDL here, since
102 * master_copy_shard_placement gets enough locks for that.
103 *
104 * We also don't need special handling for concurrent create_refernece_table.
105 * Since that will trigger a call to this function from another backend,
106 * which will block until our call is finished.
107 */
108 int colocationId = CreateReferenceTableColocationId();
109 LockColocationId(colocationId, ExclusiveLock);
110
111 List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
112 if (referenceTableIdList == NIL)
113 {
114 /* no reference tables exist */
115 UnlockColocationId(colocationId, ExclusiveLock);
116 return;
117 }
118
119 Oid referenceTableId = linitial_oid(referenceTableIdList);
120 const char *referenceTableName = get_rel_name(referenceTableId);
121 List *shardIntervalList = LoadShardIntervalList(referenceTableId);
122 if (list_length(shardIntervalList) == 0)
123 {
124 /* check for corrupt metadata */
125 ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
126 referenceTableName)));
127 }
128
129 ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
130 uint64 shardId = shardInterval->shardId;
131
132 /*
133 * We only take an access share lock, otherwise we'll hold up citus_add_node.
134 * In case of create_reference_table() where we don't want concurrent writes
135 * to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
136 */
137 List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId,
138 AccessShareLock);
139 if (list_length(newWorkersList) == 0)
140 {
141 /* nothing to do, no need for lock */
142 UnlockColocationId(colocationId, ExclusiveLock);
143 return;
144 }
145
146 /*
147 * master_copy_shard_placement triggers metadata sync-up, which tries to
148 * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement
149 * in a separate connection. If we have modified pg_dist_node in the
150 * current backend, this will cause a deadlock.
151 */
152 if (TransactionModifiedNodeMetadata)
153 {
154 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
155 errmsg("cannot replicate reference tables in a transaction "
156 "that modified node metadata")));
157 }
158
159 /*
160 * Modifications to reference tables in current transaction are not visible
161 * to master_copy_shard_placement, since it is done in a separate backend.
162 */
163 if (AnyRelationsModifiedInTransaction(referenceTableIdList))
164 {
165 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
166 errmsg("cannot replicate reference tables in a transaction "
167 "that modified a reference table")));
168 }
169
170 bool missingOk = false;
171 ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
172 if (sourceShardPlacement == NULL)
173 {
174 /* check for corrupt metadata */
175 ereport(ERROR, (errmsg("reference table shard "
176 UINT64_FORMAT
177 " does not have an active shard placement",
178 shardId)));
179 }
180
181 WorkerNode *newWorkerNode = NULL;
182 foreach_ptr(newWorkerNode, newWorkersList)
183 {
184 ereport(NOTICE, (errmsg("replicating reference table '%s' to %s:%d ...",
185 referenceTableName, newWorkerNode->workerName,
186 newWorkerNode->workerPort)));
187
188 /*
189 * Call master_copy_shard_placement using citus extension owner. Current
190 * user might not have permissions to do the copy.
191 */
192 const char *userName = CitusExtensionOwnerName();
193 int connectionFlags = OUTSIDE_TRANSACTION;
194
195 MultiConnection *connection = GetNodeUserDatabaseConnection(
196 connectionFlags, LocalHostName, PostPortNumber,
197 userName, NULL);
198
199 if (PQstatus(connection->pgConn) == CONNECTION_OK)
200 {
201 UseCoordinatedTransaction();
202
203 RemoteTransactionBegin(connection);
204 StringInfo placementCopyCommand =
205 CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
206 newWorkerNode,
207 transferMode);
208 ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
209 RemoteTransactionCommit(connection);
210 }
211 else
212 {
213 ereport(ERROR, (errmsg("could not open a connection to localhost "
214 "when replicating reference tables"),
215 errdetail(
216 "citus.replicate_reference_tables_on_activate = false "
217 "requires localhost connectivity.")));
218 }
219
220 CloseConnection(connection);
221 }
222
223 /*
224 * Unblock other backends, they will probably observe that there are no
225 * more worker nodes without placements, unless nodes were added concurrently
226 */
227 UnlockColocationId(colocationId, ExclusiveLock);
228 }
229
230
231 /*
232 * AnyRelationsModifiedInTransaction returns true if any of the given relations
233 * were modified in the current transaction.
234 */
235 static bool
AnyRelationsModifiedInTransaction(List * relationIdList)236 AnyRelationsModifiedInTransaction(List *relationIdList)
237 {
238 Oid relationId = InvalidOid;
239
240 foreach_oid(relationId, relationIdList)
241 {
242 if (GetRelationDDLAccessMode(relationId) != RELATION_NOT_ACCESSED ||
243 GetRelationDMLAccessMode(relationId) != RELATION_NOT_ACCESSED)
244 {
245 return true;
246 }
247 }
248
249 return false;
250 }
251
252
253 /*
254 * WorkersWithoutReferenceTablePlacement returns a list of workers (WorkerNode) that
255 * do not yet have a placement for the given reference table shard ID, but are
256 * supposed to.
257 */
258 static List *
WorkersWithoutReferenceTablePlacement(uint64 shardId,LOCKMODE lockMode)259 WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode)
260 {
261 List *workersWithoutPlacements = NIL;
262
263 List *shardPlacementList = ActiveShardPlacementList(shardId);
264
265 List *workerNodeList = ReferenceTablePlacementNodeList(lockMode);
266 workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
267
268 WorkerNode *workerNode = NULL;
269 foreach_ptr(workerNode, workerNodeList)
270 {
271 char *nodeName = workerNode->workerName;
272 uint32 nodePort = workerNode->workerPort;
273 ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
274 nodeName, nodePort);
275 if (targetPlacement == NULL)
276 {
277 workersWithoutPlacements = lappend(workersWithoutPlacements, workerNode);
278 }
279 }
280
281 return workersWithoutPlacements;
282 }
283
284
285 /*
286 * CopyShardPlacementToWorkerNodeQuery returns the master_copy_shard_placement
287 * command to copy the given shard placement to given node.
288 */
289 static StringInfo
CopyShardPlacementToWorkerNodeQuery(ShardPlacement * sourceShardPlacement,WorkerNode * workerNode,char transferMode)290 CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement,
291 WorkerNode *workerNode,
292 char transferMode)
293 {
294 StringInfo queryString = makeStringInfo();
295
296 const char *transferModeString =
297 transferMode == TRANSFER_MODE_BLOCK_WRITES ? "block_writes" :
298 transferMode == TRANSFER_MODE_FORCE_LOGICAL ? "force_logical" :
299 "auto";
300
301 appendStringInfo(queryString,
302 "SELECT master_copy_shard_placement("
303 UINT64_FORMAT ", %s, %d, %s, %d, do_repair := false, "
304 "transfer_mode := %s)",
305 sourceShardPlacement->shardId,
306 quote_literal_cstr(sourceShardPlacement->nodeName),
307 sourceShardPlacement->nodePort,
308 quote_literal_cstr(workerNode->workerName),
309 workerNode->workerPort,
310 quote_literal_cstr(transferModeString));
311
312 return queryString;
313 }
314
315
316 /*
317 * upgrade_to_reference_table was removed, but we maintain a dummy implementation
318 * to support downgrades.
319 */
320 Datum
upgrade_to_reference_table(PG_FUNCTION_ARGS)321 upgrade_to_reference_table(PG_FUNCTION_ARGS)
322 {
323 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
324 errmsg("this function is deprecated and no longer used")));
325 }
326
327
328 /*
329 * ReplicateShardToNode function replicates given shard to the given worker node
330 * in a separate transaction. If the worker already has
331 * a replica of the shard this is a no-op. This function also modifies metadata
332 * by inserting/updating related rows in pg_dist_placement.
333 *
334 * IMPORTANT: This should only be used to replicate shards of a reference
335 * table.
336 */
337 static void
ReplicateShardToNode(ShardInterval * shardInterval,char * nodeName,int nodePort)338 ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
339 {
340 uint64 shardId = shardInterval->shardId;
341
342 bool missingOk = false;
343 ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
344 char *srcNodeName = sourceShardPlacement->nodeName;
345 uint32 srcNodePort = sourceShardPlacement->nodePort;
346 bool includeData = true;
347 List *ddlCommandList =
348 CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
349
350 List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
351 ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
352 nodeName, nodePort);
353 char *tableOwner = TableOwner(shardInterval->relationId);
354
355 if (targetPlacement != NULL)
356 {
357 if (targetPlacement->shardState == SHARD_STATE_ACTIVE)
358 {
359 /* We already have the shard, nothing to do */
360 return;
361 }
362 ereport(ERROR, (errmsg(
363 "Placement for reference table \"%s\" on node %s:%d is not active. This should not be possible, please report this as a bug",
364 get_rel_name(shardInterval->relationId), nodeName,
365 nodePort)));
366 }
367
368 ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
369 get_rel_name(shardInterval->relationId), nodeName,
370 nodePort)));
371
372 EnsureNoModificationsHaveBeenDone();
373 SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
374 ddlCommandList);
375 int32 groupId = GroupForNode(nodeName, nodePort);
376
377 uint64 placementId = GetNextPlacementId();
378 InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0,
379 groupId);
380
381 if (ShouldSyncTableMetadata(shardInterval->relationId))
382 {
383 char *placementCommand = PlacementUpsertCommand(shardId, placementId,
384 SHARD_STATE_ACTIVE, 0,
385 groupId);
386
387 SendCommandToWorkersWithMetadata(placementCommand);
388 }
389 }
390
391
392 /*
393 * CreateReferenceTableColocationId creates a new co-location id for reference tables and
394 * writes it into pg_dist_colocation, then returns the created co-location id. Since there
395 * can be only one colocation group for all kinds of reference tables, if a co-location id
396 * is already created for reference tables, this function returns it without creating
397 * anything.
398 */
399 uint32
CreateReferenceTableColocationId()400 CreateReferenceTableColocationId()
401 {
402 int shardCount = 1;
403 Oid distributionColumnType = InvalidOid;
404 Oid distributionColumnCollation = InvalidOid;
405
406 /*
407 * We don't maintain replication factor of reference tables anymore and
408 * just use -1 instead. We don't use this value in any places.
409 */
410 int replicationFactor = -1;
411
412 /* check for existing colocations */
413 uint32 colocationId =
414 ColocationId(shardCount, replicationFactor, distributionColumnType,
415 distributionColumnCollation);
416
417 if (colocationId == INVALID_COLOCATION_ID)
418 {
419 colocationId = CreateColocationGroup(shardCount, replicationFactor,
420 distributionColumnType,
421 distributionColumnCollation);
422 }
423
424 return colocationId;
425 }
426
427
428 /*
429 * DeleteAllReferenceTablePlacementsFromNodeGroup function iterates over list of reference
430 * tables and deletes all reference table placements from pg_dist_placement table
431 * for given group.
432 */
433 void
DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)434 DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)
435 {
436 List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
437 List *referenceShardIntervalList = NIL;
438
439 /* if there are no reference tables, we do not need to do anything */
440 if (list_length(referenceTableList) == 0)
441 {
442 return;
443 }
444
445 /*
446 * We sort the reference table list to prevent deadlocks in concurrent
447 * DeleteAllReferenceTablePlacementsFromNodeGroup calls.
448 */
449 referenceTableList = SortList(referenceTableList, CompareOids);
450 if (ClusterHasKnownMetadataWorkers())
451 {
452 referenceShardIntervalList = GetSortedReferenceShardIntervals(referenceTableList);
453
454 BlockWritesToShardList(referenceShardIntervalList);
455 }
456
457 StringInfo deletePlacementCommand = makeStringInfo();
458 Oid referenceTableId = InvalidOid;
459 foreach_oid(referenceTableId, referenceTableList)
460 {
461 List *placements = GroupShardPlacementsForTableOnGroup(referenceTableId,
462 groupId);
463 if (list_length(placements) == 0)
464 {
465 /* this happens if the node was previously disabled */
466 continue;
467 }
468
469 GroupShardPlacement *placement = (GroupShardPlacement *) linitial(placements);
470
471 LockShardDistributionMetadata(placement->shardId, ExclusiveLock);
472
473 DeleteShardPlacementRow(placement->placementId);
474
475 resetStringInfo(deletePlacementCommand);
476 appendStringInfo(deletePlacementCommand,
477 "DELETE FROM pg_dist_placement WHERE placementid = "
478 UINT64_FORMAT,
479 placement->placementId);
480 SendCommandToWorkersWithMetadata(deletePlacementCommand->data);
481 }
482 }
483
484
485 /* CompareOids is a comparison function for sort shard oids */
486 int
CompareOids(const void * leftElement,const void * rightElement)487 CompareOids(const void *leftElement, const void *rightElement)
488 {
489 Oid *leftId = (Oid *) leftElement;
490 Oid *rightId = (Oid *) rightElement;
491
492 if (*leftId > *rightId)
493 {
494 return 1;
495 }
496 else if (*leftId < *rightId)
497 {
498 return -1;
499 }
500 else
501 {
502 return 0;
503 }
504 }
505
506
507 /*
508 * ReferenceTableReplicationFactor returns the replication factor for
509 * reference tables.
510 */
511 int
ReferenceTableReplicationFactor(void)512 ReferenceTableReplicationFactor(void)
513 {
514 List *nodeList = ReferenceTablePlacementNodeList(NoLock);
515 int replicationFactor = list_length(nodeList);
516 return replicationFactor;
517 }
518
519
520 /*
521 * ReplicateAllReferenceTablesToNode function finds all reference tables and
522 * replicates them to the given worker node. It also modifies pg_dist_colocation
523 * table to update the replication factor column when necessary. This function
524 * skips reference tables if that node already has healthy placement of that
525 * reference table to prevent unnecessary data transfer.
526 */
527 void
ReplicateAllReferenceTablesToNode(char * nodeName,int nodePort)528 ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
529 {
530 List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
531
532 /* if there is no reference table, we do not need to replicate anything */
533 if (list_length(referenceTableList) > 0)
534 {
535 List *referenceShardIntervalList = NIL;
536
537 /*
538 * We sort the reference table list to prevent deadlocks in concurrent
539 * ReplicateAllReferenceTablesToAllNodes calls.
540 */
541 referenceTableList = SortList(referenceTableList, CompareOids);
542 Oid referenceTableId = InvalidOid;
543 foreach_oid(referenceTableId, referenceTableList)
544 {
545 List *shardIntervalList = LoadShardIntervalList(referenceTableId);
546 ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
547
548 referenceShardIntervalList = lappend(referenceShardIntervalList,
549 shardInterval);
550 }
551
552 if (ClusterHasKnownMetadataWorkers())
553 {
554 BlockWritesToShardList(referenceShardIntervalList);
555 }
556
557 ShardInterval *shardInterval = NULL;
558 foreach_ptr(shardInterval, referenceShardIntervalList)
559 {
560 uint64 shardId = shardInterval->shardId;
561
562 LockShardDistributionMetadata(shardId, ExclusiveLock);
563
564 ReplicateShardToNode(shardInterval, nodeName, nodePort);
565 }
566
567 /* create foreign constraints between reference tables */
568 foreach_ptr(shardInterval, referenceShardIntervalList)
569 {
570 char *tableOwner = TableOwner(shardInterval->relationId);
571 List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
572
573 SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
574 commandList);
575 }
576 }
577 }
578