1 /*-------------------------------------------------------------------------
2  *
3  * reference_table_utils.c
4  *
5  * Declarations for public utility functions related to reference tables.
6  *
7  * Copyright (c) Citus Data, Inc.
8  *
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres.h"
13 #include "miscadmin.h"
14 
15 #include "access/heapam.h"
16 #include "access/htup_details.h"
17 #include "access/genam.h"
18 #include "distributed/colocation_utils.h"
19 #include "distributed/commands.h"
20 #include "distributed/listutils.h"
21 #include "distributed/coordinator_protocol.h"
22 #include "distributed/metadata_utility.h"
23 #include "distributed/metadata_cache.h"
24 #include "distributed/metadata_sync.h"
25 #include "distributed/multi_executor.h"
26 #include "distributed/multi_logical_planner.h"
27 #include "distributed/reference_table_utils.h"
28 #include "distributed/relation_access_tracking.h"
29 #include "distributed/remote_commands.h"
30 #include "distributed/resource_lock.h"
31 #include "distributed/shardinterval_utils.h"
32 #include "distributed/transaction_management.h"
33 #include "distributed/worker_manager.h"
34 #include "distributed/worker_transaction.h"
35 #include "postmaster/postmaster.h"
36 #include "storage/lmgr.h"
37 #include "utils/builtins.h"
38 #include "utils/fmgroids.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
41 
42 /* local function forward declarations */
43 static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode);
44 static StringInfo CopyShardPlacementToWorkerNodeQuery(
45 	ShardPlacement *sourceShardPlacement,
46 	WorkerNode *workerNode,
47 	char transferMode);
48 static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
49 								 int nodePort);
50 static bool AnyRelationsModifiedInTransaction(List *relationIdList);
51 
52 /* exports for SQL callable functions */
53 PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
54 PG_FUNCTION_INFO_V1(replicate_reference_tables);
55 
56 /*
57  * replicate_reference_tables is a UDF to ensure that allreference tables are
58  * replicated to all nodes.
59  */
60 Datum
replicate_reference_tables(PG_FUNCTION_ARGS)61 replicate_reference_tables(PG_FUNCTION_ARGS)
62 {
63 	EnsureReferenceTablesExistOnAllNodes();
64 
65 	PG_RETURN_VOID();
66 }
67 
68 
69 /*
70  * EnsureReferenceTablesExistOnAllNodes ensures that a shard placement for every
71  * reference table exists on all nodes. If a node does not have a set of shard
72  * placements, then master_copy_shard_placement is called in a subtransaction
73  * to pull the data to the new node.
74  */
75 void
EnsureReferenceTablesExistOnAllNodes(void)76 EnsureReferenceTablesExistOnAllNodes(void)
77 {
78 	EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
79 }
80 
81 
82 /*
83  * EnsureReferenceTablesExistOnAllNodesExtended ensures that a shard placement for every
84  * reference table exists on all nodes. If a node does not have a set of shard placements,
85  * then master_copy_shard_placement is called in a subtransaction to pull the data to the
86  * new node.
87  *
88  * The transferMode is passed on to the implementation of the copy to control the locks
89  * and transferMode.
90  */
91 void
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)92 EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
93 {
94 	/*
95 	 * Prevent this function from running concurrently with itself.
96 	 *
97 	 * It also prevents concurrent DROP TABLE or DROP SCHEMA. We need this
98 	 * because through-out this function we assume values in referenceTableIdList
99 	 * are still valid.
100 	 *
101 	 * We don't need to handle other kinds of reference table DML/DDL here, since
102 	 * master_copy_shard_placement gets enough locks for that.
103 	 *
104 	 * We also don't need special handling for concurrent create_refernece_table.
105 	 * Since that will trigger a call to this function from another backend,
106 	 * which will block until our call is finished.
107 	 */
108 	int colocationId = CreateReferenceTableColocationId();
109 	LockColocationId(colocationId, ExclusiveLock);
110 
111 	List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
112 	if (referenceTableIdList == NIL)
113 	{
114 		/* no reference tables exist */
115 		UnlockColocationId(colocationId, ExclusiveLock);
116 		return;
117 	}
118 
119 	Oid referenceTableId = linitial_oid(referenceTableIdList);
120 	const char *referenceTableName = get_rel_name(referenceTableId);
121 	List *shardIntervalList = LoadShardIntervalList(referenceTableId);
122 	if (list_length(shardIntervalList) == 0)
123 	{
124 		/* check for corrupt metadata */
125 		ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
126 							   referenceTableName)));
127 	}
128 
129 	ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
130 	uint64 shardId = shardInterval->shardId;
131 
132 	/*
133 	 * We only take an access share lock, otherwise we'll hold up citus_add_node.
134 	 * In case of create_reference_table() where we don't want concurrent writes
135 	 * to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
136 	 */
137 	List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId,
138 																 AccessShareLock);
139 	if (list_length(newWorkersList) == 0)
140 	{
141 		/* nothing to do, no need for lock */
142 		UnlockColocationId(colocationId, ExclusiveLock);
143 		return;
144 	}
145 
146 	/*
147 	 * master_copy_shard_placement triggers metadata sync-up, which tries to
148 	 * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement
149 	 * in a separate connection. If we have modified pg_dist_node in the
150 	 * current backend, this will cause a deadlock.
151 	 */
152 	if (TransactionModifiedNodeMetadata)
153 	{
154 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
155 						errmsg("cannot replicate reference tables in a transaction "
156 							   "that modified node metadata")));
157 	}
158 
159 	/*
160 	 * Modifications to reference tables in current transaction are not visible
161 	 * to master_copy_shard_placement, since it is done in a separate backend.
162 	 */
163 	if (AnyRelationsModifiedInTransaction(referenceTableIdList))
164 	{
165 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
166 						errmsg("cannot replicate reference tables in a transaction "
167 							   "that modified a reference table")));
168 	}
169 
170 	bool missingOk = false;
171 	ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
172 	if (sourceShardPlacement == NULL)
173 	{
174 		/* check for corrupt metadata */
175 		ereport(ERROR, (errmsg("reference table shard "
176 							   UINT64_FORMAT
177 							   " does not have an active shard placement",
178 							   shardId)));
179 	}
180 
181 	WorkerNode *newWorkerNode = NULL;
182 	foreach_ptr(newWorkerNode, newWorkersList)
183 	{
184 		ereport(NOTICE, (errmsg("replicating reference table '%s' to %s:%d ...",
185 								referenceTableName, newWorkerNode->workerName,
186 								newWorkerNode->workerPort)));
187 
188 		/*
189 		 * Call master_copy_shard_placement using citus extension owner. Current
190 		 * user might not have permissions to do the copy.
191 		 */
192 		const char *userName = CitusExtensionOwnerName();
193 		int connectionFlags = OUTSIDE_TRANSACTION;
194 
195 		MultiConnection *connection = GetNodeUserDatabaseConnection(
196 			connectionFlags, LocalHostName, PostPortNumber,
197 			userName, NULL);
198 
199 		if (PQstatus(connection->pgConn) == CONNECTION_OK)
200 		{
201 			UseCoordinatedTransaction();
202 
203 			RemoteTransactionBegin(connection);
204 			StringInfo placementCopyCommand =
205 				CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
206 													newWorkerNode,
207 													transferMode);
208 			ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
209 			RemoteTransactionCommit(connection);
210 		}
211 		else
212 		{
213 			ereport(ERROR, (errmsg("could not open a connection to localhost "
214 								   "when replicating reference tables"),
215 							errdetail(
216 								"citus.replicate_reference_tables_on_activate = false "
217 								"requires localhost connectivity.")));
218 		}
219 
220 		CloseConnection(connection);
221 	}
222 
223 	/*
224 	 * Unblock other backends, they will probably observe that there are no
225 	 * more worker nodes without placements, unless nodes were added concurrently
226 	 */
227 	UnlockColocationId(colocationId, ExclusiveLock);
228 }
229 
230 
231 /*
232  * AnyRelationsModifiedInTransaction returns true if any of the given relations
233  * were modified in the current transaction.
234  */
235 static bool
AnyRelationsModifiedInTransaction(List * relationIdList)236 AnyRelationsModifiedInTransaction(List *relationIdList)
237 {
238 	Oid relationId = InvalidOid;
239 
240 	foreach_oid(relationId, relationIdList)
241 	{
242 		if (GetRelationDDLAccessMode(relationId) != RELATION_NOT_ACCESSED ||
243 			GetRelationDMLAccessMode(relationId) != RELATION_NOT_ACCESSED)
244 		{
245 			return true;
246 		}
247 	}
248 
249 	return false;
250 }
251 
252 
253 /*
254  * WorkersWithoutReferenceTablePlacement returns a list of workers (WorkerNode) that
255  * do not yet have a placement for the given reference table shard ID, but are
256  * supposed to.
257  */
258 static List *
WorkersWithoutReferenceTablePlacement(uint64 shardId,LOCKMODE lockMode)259 WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode)
260 {
261 	List *workersWithoutPlacements = NIL;
262 
263 	List *shardPlacementList = ActiveShardPlacementList(shardId);
264 
265 	List *workerNodeList = ReferenceTablePlacementNodeList(lockMode);
266 	workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
267 
268 	WorkerNode *workerNode = NULL;
269 	foreach_ptr(workerNode, workerNodeList)
270 	{
271 		char *nodeName = workerNode->workerName;
272 		uint32 nodePort = workerNode->workerPort;
273 		ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
274 																	 nodeName, nodePort);
275 		if (targetPlacement == NULL)
276 		{
277 			workersWithoutPlacements = lappend(workersWithoutPlacements, workerNode);
278 		}
279 	}
280 
281 	return workersWithoutPlacements;
282 }
283 
284 
285 /*
286  * CopyShardPlacementToWorkerNodeQuery returns the master_copy_shard_placement
287  * command to copy the given shard placement to given node.
288  */
289 static StringInfo
CopyShardPlacementToWorkerNodeQuery(ShardPlacement * sourceShardPlacement,WorkerNode * workerNode,char transferMode)290 CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement,
291 									WorkerNode *workerNode,
292 									char transferMode)
293 {
294 	StringInfo queryString = makeStringInfo();
295 
296 	const char *transferModeString =
297 		transferMode == TRANSFER_MODE_BLOCK_WRITES ? "block_writes" :
298 		transferMode == TRANSFER_MODE_FORCE_LOGICAL ? "force_logical" :
299 		"auto";
300 
301 	appendStringInfo(queryString,
302 					 "SELECT master_copy_shard_placement("
303 					 UINT64_FORMAT ", %s, %d, %s, %d, do_repair := false, "
304 								   "transfer_mode := %s)",
305 					 sourceShardPlacement->shardId,
306 					 quote_literal_cstr(sourceShardPlacement->nodeName),
307 					 sourceShardPlacement->nodePort,
308 					 quote_literal_cstr(workerNode->workerName),
309 					 workerNode->workerPort,
310 					 quote_literal_cstr(transferModeString));
311 
312 	return queryString;
313 }
314 
315 
316 /*
317  * upgrade_to_reference_table was removed, but we maintain a dummy implementation
318  * to support downgrades.
319  */
320 Datum
upgrade_to_reference_table(PG_FUNCTION_ARGS)321 upgrade_to_reference_table(PG_FUNCTION_ARGS)
322 {
323 	ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
324 					errmsg("this function is deprecated and no longer used")));
325 }
326 
327 
328 /*
329  * ReplicateShardToNode function replicates given shard to the given worker node
330  * in a separate transaction. If the worker already has
331  * a replica of the shard this is a no-op. This function also modifies metadata
332  * by inserting/updating related rows in pg_dist_placement.
333  *
334  * IMPORTANT: This should only be used to replicate shards of a reference
335  * table.
336  */
337 static void
ReplicateShardToNode(ShardInterval * shardInterval,char * nodeName,int nodePort)338 ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
339 {
340 	uint64 shardId = shardInterval->shardId;
341 
342 	bool missingOk = false;
343 	ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
344 	char *srcNodeName = sourceShardPlacement->nodeName;
345 	uint32 srcNodePort = sourceShardPlacement->nodePort;
346 	bool includeData = true;
347 	List *ddlCommandList =
348 		CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
349 
350 	List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
351 	ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
352 																 nodeName, nodePort);
353 	char *tableOwner = TableOwner(shardInterval->relationId);
354 
355 	if (targetPlacement != NULL)
356 	{
357 		if (targetPlacement->shardState == SHARD_STATE_ACTIVE)
358 		{
359 			/* We already have the shard, nothing to do */
360 			return;
361 		}
362 		ereport(ERROR, (errmsg(
363 							"Placement for reference table \"%s\" on node %s:%d is not active. This should not be possible, please report this as a bug",
364 							get_rel_name(shardInterval->relationId), nodeName,
365 							nodePort)));
366 	}
367 
368 	ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
369 							get_rel_name(shardInterval->relationId), nodeName,
370 							nodePort)));
371 
372 	EnsureNoModificationsHaveBeenDone();
373 	SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
374 											  ddlCommandList);
375 	int32 groupId = GroupForNode(nodeName, nodePort);
376 
377 	uint64 placementId = GetNextPlacementId();
378 	InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0,
379 							groupId);
380 
381 	if (ShouldSyncTableMetadata(shardInterval->relationId))
382 	{
383 		char *placementCommand = PlacementUpsertCommand(shardId, placementId,
384 														SHARD_STATE_ACTIVE, 0,
385 														groupId);
386 
387 		SendCommandToWorkersWithMetadata(placementCommand);
388 	}
389 }
390 
391 
392 /*
393  * CreateReferenceTableColocationId creates a new co-location id for reference tables and
394  * writes it into pg_dist_colocation, then returns the created co-location id. Since there
395  * can be only one colocation group for all kinds of reference tables, if a co-location id
396  * is already created for reference tables, this function returns it without creating
397  * anything.
398  */
399 uint32
CreateReferenceTableColocationId()400 CreateReferenceTableColocationId()
401 {
402 	int shardCount = 1;
403 	Oid distributionColumnType = InvalidOid;
404 	Oid distributionColumnCollation = InvalidOid;
405 
406 	/*
407 	 * We don't maintain replication factor of reference tables anymore and
408 	 * just use -1 instead. We don't use this value in any places.
409 	 */
410 	int replicationFactor = -1;
411 
412 	/* check for existing colocations */
413 	uint32 colocationId =
414 		ColocationId(shardCount, replicationFactor, distributionColumnType,
415 					 distributionColumnCollation);
416 
417 	if (colocationId == INVALID_COLOCATION_ID)
418 	{
419 		colocationId = CreateColocationGroup(shardCount, replicationFactor,
420 											 distributionColumnType,
421 											 distributionColumnCollation);
422 	}
423 
424 	return colocationId;
425 }
426 
427 
428 /*
429  * DeleteAllReferenceTablePlacementsFromNodeGroup function iterates over list of reference
430  * tables and deletes all reference table placements from pg_dist_placement table
431  * for given group.
432  */
433 void
DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)434 DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)
435 {
436 	List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
437 	List *referenceShardIntervalList = NIL;
438 
439 	/* if there are no reference tables, we do not need to do anything */
440 	if (list_length(referenceTableList) == 0)
441 	{
442 		return;
443 	}
444 
445 	/*
446 	 * We sort the reference table list to prevent deadlocks in concurrent
447 	 * DeleteAllReferenceTablePlacementsFromNodeGroup calls.
448 	 */
449 	referenceTableList = SortList(referenceTableList, CompareOids);
450 	if (ClusterHasKnownMetadataWorkers())
451 	{
452 		referenceShardIntervalList = GetSortedReferenceShardIntervals(referenceTableList);
453 
454 		BlockWritesToShardList(referenceShardIntervalList);
455 	}
456 
457 	StringInfo deletePlacementCommand = makeStringInfo();
458 	Oid referenceTableId = InvalidOid;
459 	foreach_oid(referenceTableId, referenceTableList)
460 	{
461 		List *placements = GroupShardPlacementsForTableOnGroup(referenceTableId,
462 															   groupId);
463 		if (list_length(placements) == 0)
464 		{
465 			/* this happens if the node was previously disabled */
466 			continue;
467 		}
468 
469 		GroupShardPlacement *placement = (GroupShardPlacement *) linitial(placements);
470 
471 		LockShardDistributionMetadata(placement->shardId, ExclusiveLock);
472 
473 		DeleteShardPlacementRow(placement->placementId);
474 
475 		resetStringInfo(deletePlacementCommand);
476 		appendStringInfo(deletePlacementCommand,
477 						 "DELETE FROM pg_dist_placement WHERE placementid = "
478 						 UINT64_FORMAT,
479 						 placement->placementId);
480 		SendCommandToWorkersWithMetadata(deletePlacementCommand->data);
481 	}
482 }
483 
484 
485 /* CompareOids is a comparison function for sort shard oids */
486 int
CompareOids(const void * leftElement,const void * rightElement)487 CompareOids(const void *leftElement, const void *rightElement)
488 {
489 	Oid *leftId = (Oid *) leftElement;
490 	Oid *rightId = (Oid *) rightElement;
491 
492 	if (*leftId > *rightId)
493 	{
494 		return 1;
495 	}
496 	else if (*leftId < *rightId)
497 	{
498 		return -1;
499 	}
500 	else
501 	{
502 		return 0;
503 	}
504 }
505 
506 
507 /*
508  * ReferenceTableReplicationFactor returns the replication factor for
509  * reference tables.
510  */
511 int
ReferenceTableReplicationFactor(void)512 ReferenceTableReplicationFactor(void)
513 {
514 	List *nodeList = ReferenceTablePlacementNodeList(NoLock);
515 	int replicationFactor = list_length(nodeList);
516 	return replicationFactor;
517 }
518 
519 
520 /*
521  * ReplicateAllReferenceTablesToNode function finds all reference tables and
522  * replicates them to the given worker node. It also modifies pg_dist_colocation
523  * table to update the replication factor column when necessary. This function
524  * skips reference tables if that node already has healthy placement of that
525  * reference table to prevent unnecessary data transfer.
526  */
527 void
ReplicateAllReferenceTablesToNode(char * nodeName,int nodePort)528 ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
529 {
530 	List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
531 
532 	/* if there is no reference table, we do not need to replicate anything */
533 	if (list_length(referenceTableList) > 0)
534 	{
535 		List *referenceShardIntervalList = NIL;
536 
537 		/*
538 		 * We sort the reference table list to prevent deadlocks in concurrent
539 		 * ReplicateAllReferenceTablesToAllNodes calls.
540 		 */
541 		referenceTableList = SortList(referenceTableList, CompareOids);
542 		Oid referenceTableId = InvalidOid;
543 		foreach_oid(referenceTableId, referenceTableList)
544 		{
545 			List *shardIntervalList = LoadShardIntervalList(referenceTableId);
546 			ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
547 
548 			referenceShardIntervalList = lappend(referenceShardIntervalList,
549 												 shardInterval);
550 		}
551 
552 		if (ClusterHasKnownMetadataWorkers())
553 		{
554 			BlockWritesToShardList(referenceShardIntervalList);
555 		}
556 
557 		ShardInterval *shardInterval = NULL;
558 		foreach_ptr(shardInterval, referenceShardIntervalList)
559 		{
560 			uint64 shardId = shardInterval->shardId;
561 
562 			LockShardDistributionMetadata(shardId, ExclusiveLock);
563 
564 			ReplicateShardToNode(shardInterval, nodeName, nodePort);
565 		}
566 
567 		/* create foreign constraints between reference tables */
568 		foreach_ptr(shardInterval, referenceShardIntervalList)
569 		{
570 			char *tableOwner = TableOwner(shardInterval->relationId);
571 			List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
572 
573 			SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
574 													  commandList);
575 		}
576 	}
577 }
578