1 /*-------------------------------------------------------------------------
2  *
3  * metadata_sync.c
4  *
5  * Routines for synchronizing metadata to all workers.
6  *
7  * Copyright (c) Citus Data, Inc.
8  *
9  * $Id$
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 #include "miscadmin.h"
16 
17 #include <signal.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include "access/genam.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/nbtree.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "catalog/dependency.h"
28 #include "catalog/indexing.h"
29 #include "catalog/pg_am.h"
30 #include "catalog/pg_attrdef.h"
31 #include "catalog/pg_depend.h"
32 #include "catalog/pg_foreign_server.h"
33 #include "catalog/pg_namespace.h"
34 #include "catalog/pg_type.h"
35 #include "commands/async.h"
36 #include "distributed/argutils.h"
37 #include "distributed/backend_data.h"
38 #include "distributed/citus_ruleutils.h"
39 #include "distributed/colocation_utils.h"
40 #include "distributed/commands.h"
41 #include "distributed/deparser.h"
42 #include "distributed/distribution_column.h"
43 #include "distributed/listutils.h"
44 #include "distributed/metadata_utility.h"
45 #include "distributed/coordinator_protocol.h"
46 #include "distributed/maintenanced.h"
47 #include "distributed/metadata_cache.h"
48 #include "distributed/metadata_sync.h"
49 #include "distributed/metadata/distobject.h"
50 #include "distributed/multi_executor.h"
51 #include "distributed/multi_join_order.h"
52 #include "distributed/multi_partitioning_utils.h"
53 #include "distributed/multi_physical_planner.h"
54 #include "distributed/pg_dist_node.h"
55 #include "distributed/pg_dist_shard.h"
56 #include "distributed/relation_access_tracking.h"
57 #include "distributed/remote_commands.h"
58 #include "distributed/resource_lock.h"
59 #include "distributed/worker_manager.h"
60 #include "distributed/worker_protocol.h"
61 #include "distributed/worker_transaction.h"
62 #include "distributed/version_compat.h"
63 #include "executor/spi.h"
64 #include "foreign/foreign.h"
65 #include "miscadmin.h"
66 #include "nodes/pg_list.h"
67 #include "pgstat.h"
68 #include "postmaster/bgworker.h"
69 #include "postmaster/postmaster.h"
70 #include "storage/lmgr.h"
71 #include "utils/builtins.h"
72 #include "utils/fmgroids.h"
73 #include "utils/lsyscache.h"
74 #include "utils/memutils.h"
75 #include "utils/snapmgr.h"
76 #include "utils/syscache.h"
77 
78 
79 /* managed via a GUC */
80 char *EnableManualMetadataChangesForUser = "";
81 
82 
83 static void EnsureSequentialModeMetadataOperations(void);
84 static List * GetDistributedTableDDLEvents(Oid relationId);
85 static char * LocalGroupIdUpdateCommand(int32 groupId);
86 static List * SequenceDependencyCommandList(Oid relationId);
87 static char * TruncateTriggerCreateCommand(Oid relationId);
88 static char * SchemaOwnerName(Oid objectId);
89 static bool HasMetadataWorkers(void);
90 static List * DetachPartitionCommandList(void);
91 static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
92 static void DropMetadataSnapshotOnNode(WorkerNode *workerNode);
93 static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId,
94 											  char *columnName);
95 static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid,
96 													  AclItem *aclItem);
97 static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid,
98 													  Oid schemaOid,
99 													  char *permission,
100 													  bool withGrantOption);
101 static char * GenerateSetRoleQuery(Oid roleOid);
102 static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
103 static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
104 
105 
106 static bool ShouldSkipMetadataChecks(void);
107 static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod,
108 										  int colocationId, char replicationModel,
109 										  Var *distributionKey);
110 static void EnsureCoordinatorInitiatedOperation(void);
111 static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
112 									  text *shardMinValue,
113 									  text *shardMaxValue);
114 static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId,
115 											   int64 placementId, int32 shardState,
116 											   int64 shardLength, int32 groupId);
117 
118 PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
119 PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
120 PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
121 
122 
123 /*
124  * Functions to modify metadata. Normally modifying metadata requires
125  * superuser. However, these functions can be called with superusers
126  * or regular users as long as the regular user owns the input object.
127  */
128 PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
129 PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
130 PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
131 PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
132 PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
133 PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
134 
135 
136 static bool got_SIGTERM = false;
137 static bool got_SIGALRM = false;
138 
139 #define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon"
140 
141 
142 /*
143  * start_metadata_sync_to_node function sets hasmetadata column of the given
144  * node to true, and then synchronizes the metadata on the node.
145  */
146 Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS)147 start_metadata_sync_to_node(PG_FUNCTION_ARGS)
148 {
149 	CheckCitusVersion(ERROR);
150 
151 	text *nodeName = PG_GETARG_TEXT_P(0);
152 	int32 nodePort = PG_GETARG_INT32(1);
153 
154 	char *nodeNameString = text_to_cstring(nodeName);
155 
156 	StartMetadataSyncToNode(nodeNameString, nodePort);
157 
158 	PG_RETURN_VOID();
159 }
160 
161 
162 /*
163  * StartMetadataSyncToNode is the internal API for
164  * start_metadata_sync_to_node().
165  */
166 void
StartMetadataSyncToNode(const char * nodeNameString,int32 nodePort)167 StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
168 {
169 	char *escapedNodeName = quote_literal_cstr(nodeNameString);
170 
171 	CheckCitusVersion(ERROR);
172 	EnsureCoordinator();
173 	EnsureSuperUser();
174 	EnsureModificationsCanRun();
175 
176 	EnsureSequentialModeMetadataOperations();
177 
178 	LockRelationOid(DistNodeRelationId(), ExclusiveLock);
179 
180 	WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort);
181 	if (workerNode == NULL)
182 	{
183 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
184 						errmsg("you cannot sync metadata to a non-existent node"),
185 						errhint("First, add the node with SELECT citus_add_node"
186 								"(%s,%d)", escapedNodeName, nodePort)));
187 	}
188 
189 	if (!workerNode->isActive)
190 	{
191 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 						errmsg("you cannot sync metadata to an inactive node"),
193 						errhint("First, activate the node with "
194 								"SELECT citus_activate_node(%s,%d)",
195 								escapedNodeName, nodePort)));
196 	}
197 
198 	if (NodeIsCoordinator(workerNode))
199 	{
200 		ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
201 								"metadata, skipping syncing the metadata",
202 								nodeNameString, nodePort)));
203 		return;
204 	}
205 
206 	UseCoordinatedTransaction();
207 
208 	/*
209 	 * One would normally expect to set hasmetadata first, and then metadata sync.
210 	 * However, at this point we do the order reverse.
211 	 * We first set metadatasynced, and then hasmetadata; since setting columns for
212 	 * nodes with metadatasynced==false could cause errors.
213 	 * (See ErrorIfAnyMetadataNodeOutOfSync)
214 	 * We can safely do that because we are in a coordinated transaction and the changes
215 	 * are only visible to our own transaction.
216 	 * If anything goes wrong, we are going to rollback all the changes.
217 	 */
218 	workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
219 								 BoolGetDatum(true));
220 	workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(
221 									 true));
222 
223 	if (!NodeIsPrimary(workerNode))
224 	{
225 		/*
226 		 * If this is a secondary node we can't actually sync metadata to it; we assume
227 		 * the primary node is receiving metadata.
228 		 */
229 		return;
230 	}
231 
232 	/* fail if metadata synchronization doesn't succeed */
233 	bool raiseInterrupts = true;
234 	SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
235 }
236 
237 
238 /*
239  * EnsureSequentialModeMetadataOperations makes sure that the current transaction is
240  * already in sequential mode, or can still safely be put in sequential mode,
241  * it errors if that is not possible. The error contains information for the user to
242  * retry the transaction with sequential mode set from the begining.
243  *
244  * Metadata objects (e.g., distributed table on the workers) exists only 1 instance of
245  * the type used by potentially multiple other shards/connections. To make sure all
246  * shards/connections in the transaction can interact with the metadata needs to be
247  * visible on all connections used by the transaction, meaning we can only use 1
248  * connection per node.
249  */
250 static void
EnsureSequentialModeMetadataOperations(void)251 EnsureSequentialModeMetadataOperations(void)
252 {
253 	if (!IsTransactionBlock())
254 	{
255 		/* we do not need to switch to sequential mode if we are not in a transaction */
256 		return;
257 	}
258 
259 	if (ParallelQueryExecutedInTransaction())
260 	{
261 		ereport(ERROR, (errmsg(
262 							"cannot execute metadata syncing operation because there was a "
263 							"parallel operation on a distributed table in the "
264 							"transaction"),
265 						errdetail("When modifying metadata, Citus needs to "
266 								  "perform all operations over a single connection per "
267 								  "node to ensure consistency."),
268 						errhint("Try re-running the transaction with "
269 								"\"SET LOCAL citus.multi_shard_modify_mode TO "
270 								"\'sequential\';\"")));
271 	}
272 
273 	ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
274 					 errdetail("Metadata synced or stopped syncing. To make "
275 							   "sure subsequent commands see the metadata correctly "
276 							   "we need to make sure to use only one connection for "
277 							   "all future commands")));
278 	SetLocalMultiShardModifyModeToSequential();
279 }
280 
281 
282 /*
283  * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node
284  * to false in pg_dist_node table, thus indicating that the specified worker node does not
285  * receive DDL changes anymore and cannot be used for issuing queries.
286  */
287 Datum
stop_metadata_sync_to_node(PG_FUNCTION_ARGS)288 stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
289 {
290 	CheckCitusVersion(ERROR);
291 	EnsureCoordinator();
292 	EnsureSuperUser();
293 
294 	text *nodeName = PG_GETARG_TEXT_P(0);
295 	int32 nodePort = PG_GETARG_INT32(1);
296 	bool clearMetadata = PG_GETARG_BOOL(2);
297 	char *nodeNameString = text_to_cstring(nodeName);
298 
299 	LockRelationOid(DistNodeRelationId(), ExclusiveLock);
300 
301 	WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort);
302 	if (workerNode == NULL)
303 	{
304 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
305 						errmsg("node (%s,%d) does not exist", nodeNameString, nodePort)));
306 	}
307 
308 	if (NodeIsCoordinator(workerNode))
309 	{
310 		ereport(NOTICE, (errmsg("node (%s,%d) is the coordinator and should have "
311 								"metadata, skipping stopping the metadata sync",
312 								nodeNameString, nodePort)));
313 		PG_RETURN_VOID();
314 	}
315 
316 	if (clearMetadata)
317 	{
318 		if (NodeIsPrimary(workerNode))
319 		{
320 			ereport(NOTICE, (errmsg("dropping metadata on the node (%s,%d)",
321 									nodeNameString, nodePort)));
322 			DropMetadataSnapshotOnNode(workerNode);
323 		}
324 		else
325 		{
326 			/*
327 			 * If this is a secondary node we can't actually clear metadata from it,
328 			 * we assume the primary node is cleared.
329 			 */
330 			ereport(NOTICE, (errmsg("(%s,%d) is a secondary node: to clear the metadata,"
331 									" you should clear metadata from the primary node",
332 									nodeNameString, nodePort)));
333 		}
334 	}
335 
336 	workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(
337 									 false));
338 	workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
339 								 BoolGetDatum(false));
340 
341 	PG_RETURN_VOID();
342 }
343 
344 
345 /*
346  * ClusterHasKnownMetadataWorkers returns true if the node executing the function
347  * knows at least one worker with metadata. We do it
348  * (a) by checking the node that executes the function is a worker with metadata
349  * (b) the coordinator knows at least one worker with metadata.
350  */
351 bool
ClusterHasKnownMetadataWorkers()352 ClusterHasKnownMetadataWorkers()
353 {
354 	bool workerWithMetadata = false;
355 
356 	if (!IsCoordinator())
357 	{
358 		workerWithMetadata = true;
359 	}
360 
361 	if (workerWithMetadata || HasMetadataWorkers())
362 	{
363 		return true;
364 	}
365 
366 	return false;
367 }
368 
369 
370 /*
371  * ShouldSyncTableMetadata checks if the metadata of a distributed table should be
372  * propagated to metadata workers, i.e. the table is an MX table or reference table.
373  * Tables with streaming replication model (which means RF=1) and hash distribution are
374  * considered as MX tables while tables with none distribution are reference tables.
375  */
376 bool
ShouldSyncTableMetadata(Oid relationId)377 ShouldSyncTableMetadata(Oid relationId)
378 {
379 	if (!OidIsValid(relationId) || !IsCitusTable(relationId))
380 	{
381 		return false;
382 	}
383 
384 	CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
385 
386 	bool streamingReplicated =
387 		(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
388 
389 	bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry,
390 																	  HASH_DISTRIBUTED));
391 	if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
392 	{
393 		return true;
394 	}
395 	else
396 	{
397 		return false;
398 	}
399 }
400 
401 
402 /*
403  * SyncMetadataSnapshotToNode does the following:
404  *  1. Sets the localGroupId on the worker so the worker knows which tuple in
405  *     pg_dist_node represents itself.
406  *  2. Recreates the distributed metadata on the given worker.
407  * If raiseOnError is true, it errors out if synchronization fails.
408  */
409 static bool
SyncMetadataSnapshotToNode(WorkerNode * workerNode,bool raiseOnError)410 SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
411 {
412 	char *currentUser = CurrentUserName();
413 
414 	/* generate and add the local group id's update query */
415 	char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
416 
417 	/* generate the queries which drop the metadata */
418 	List *dropMetadataCommandList = MetadataDropCommands();
419 
420 	/* generate the queries which create the metadata from scratch */
421 	List *createMetadataCommandList = MetadataCreateCommands();
422 
423 	List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
424 	recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
425 													  dropMetadataCommandList);
426 	recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
427 													  createMetadataCommandList);
428 
429 	/*
430 	 * Send the snapshot recreation commands in a single remote transaction and
431 	 * if requested, error out in any kind of failure. Note that it is not
432 	 * required to send createMetadataSnapshotCommandList in the same transaction
433 	 * that we send nodeDeleteCommand and nodeInsertCommand commands below.
434 	 */
435 	if (raiseOnError)
436 	{
437 		SendCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
438 														workerNode->workerPort,
439 														currentUser,
440 														recreateMetadataSnapshotCommandList);
441 		return true;
442 	}
443 	else
444 	{
445 		bool success =
446 			SendOptionalCommandListToWorkerInCoordinatedTransaction(
447 				workerNode->workerName, workerNode->workerPort,
448 				currentUser, recreateMetadataSnapshotCommandList);
449 
450 		return success;
451 	}
452 }
453 
454 
455 /*
456  * DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them
457  * to the worker given as parameter.
458  */
459 static void
DropMetadataSnapshotOnNode(WorkerNode * workerNode)460 DropMetadataSnapshotOnNode(WorkerNode *workerNode)
461 {
462 	EnsureSequentialModeMetadataOperations();
463 
464 	char *userName = CurrentUserName();
465 
466 	/* generate the queries which drop the metadata */
467 	List *dropMetadataCommandList = MetadataDropCommands();
468 
469 	dropMetadataCommandList = lappend(dropMetadataCommandList,
470 									  LocalGroupIdUpdateCommand(0));
471 
472 	SendOptionalCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
473 															workerNode->workerPort,
474 															userName,
475 															dropMetadataCommandList);
476 }
477 
478 
479 /*
480  * MetadataCreateCommands returns list of queries that are
481  * required to create the current metadata snapshot of the node that the
482  * function is called. The metadata snapshot commands includes the
483  * following queries:
484  *
485  * (i)   Query that populates pg_dist_node table
486  * (ii)  Queries that create the clustered tables (including foreign keys,
487  *        partitioning hierarchy etc.)
488  * (iii) Queries that populate pg_dist_partition table referenced by (ii)
489  * (iv)  Queries that populate pg_dist_shard table referenced by (iii)
490  * (v)   Queries that populate pg_dist_placement table referenced by (iv)
491  */
492 List *
MetadataCreateCommands(void)493 MetadataCreateCommands(void)
494 {
495 	List *metadataSnapshotCommandList = NIL;
496 	List *distributedTableList = CitusTableList();
497 	List *propagatedTableList = NIL;
498 	bool includeNodesFromOtherClusters = true;
499 	List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
500 	IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
501 
502 	/* make sure we have deterministic output for our tests */
503 	workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
504 
505 	/* generate insert command for pg_dist_node table */
506 	char *nodeListInsertCommand = NodeListInsertCommand(workerNodeList);
507 	metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
508 										  nodeListInsertCommand);
509 
510 	/* create the list of tables whose metadata will be created */
511 	CitusTableCacheEntry *cacheEntry = NULL;
512 	foreach_ptr(cacheEntry, distributedTableList)
513 	{
514 		if (ShouldSyncTableMetadata(cacheEntry->relationId))
515 		{
516 			propagatedTableList = lappend(propagatedTableList, cacheEntry);
517 		}
518 	}
519 
520 	/* create the tables, but not the metadata */
521 	foreach_ptr(cacheEntry, propagatedTableList)
522 	{
523 		Oid relationId = cacheEntry->relationId;
524 		ObjectAddress tableAddress = { 0 };
525 
526 		if (IsTableOwnedByExtension(relationId))
527 		{
528 			/* skip table creation when the Citus table is owned by an extension */
529 			continue;
530 		}
531 
532 		List *ddlCommandList = GetFullTableCreationCommands(relationId,
533 															includeSequenceDefaults);
534 		char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
535 
536 		/*
537 		 * Tables might have dependencies on different objects, since we create shards for
538 		 * table via multiple sessions these objects will be created via their own connection
539 		 * and committed immediately so they become visible to all sessions creating shards.
540 		 */
541 		ObjectAddressSet(tableAddress, RelationRelationId, relationId);
542 		EnsureDependenciesExistOnAllNodes(&tableAddress);
543 
544 		/*
545 		 * Ensure sequence dependencies and mark them as distributed
546 		 */
547 		List *attnumList = NIL;
548 		List *dependentSequenceList = NIL;
549 		GetDependentSequencesWithRelation(relationId, &attnumList,
550 										  &dependentSequenceList, 0);
551 		MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
552 
553 		List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
554 		metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
555 												  workerSequenceDDLCommands);
556 
557 		/* ddlCommandList contains TableDDLCommand information, need to materialize */
558 		TableDDLCommand *tableDDLCommand = NULL;
559 		foreach_ptr(tableDDLCommand, ddlCommandList)
560 		{
561 			Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
562 			metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
563 												  GetTableDDLCommand(tableDDLCommand));
564 		}
565 
566 		metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
567 											  tableOwnerResetCommand);
568 
569 		List *sequenceDependencyCommandList = SequenceDependencyCommandList(
570 			relationId);
571 		metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
572 												  sequenceDependencyCommandList);
573 	}
574 
575 	/* construct the foreign key constraints after all tables are created */
576 	foreach_ptr(cacheEntry, propagatedTableList)
577 	{
578 		Oid relationId = cacheEntry->relationId;
579 
580 		if (IsTableOwnedByExtension(relationId))
581 		{
582 			/* skip foreign key creation when the Citus table is owned by an extension */
583 			continue;
584 		}
585 
586 		List *foreignConstraintCommands =
587 			GetReferencingForeignConstaintCommands(relationId);
588 
589 		metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
590 												  foreignConstraintCommands);
591 	}
592 
593 	/* construct partitioning hierarchy after all tables are created */
594 	foreach_ptr(cacheEntry, propagatedTableList)
595 	{
596 		Oid relationId = cacheEntry->relationId;
597 
598 		if (IsTableOwnedByExtension(relationId))
599 		{
600 			/* skip partition creation when the Citus table is owned by an extension */
601 			continue;
602 		}
603 
604 		if (PartitionTable(relationId))
605 		{
606 			char *alterTableAttachPartitionCommands =
607 				GenerateAlterTableAttachPartitionCommand(relationId);
608 
609 			metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
610 												  alterTableAttachPartitionCommands);
611 		}
612 	}
613 
614 	/* after all tables are created, create the metadata */
615 	foreach_ptr(cacheEntry, propagatedTableList)
616 	{
617 		Oid clusteredTableId = cacheEntry->relationId;
618 
619 		/* add the table metadata command first*/
620 		char *metadataCommand = DistributionCreateCommand(cacheEntry);
621 		metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
622 											  metadataCommand);
623 
624 		/* add the truncate trigger command after the table became distributed */
625 		char *truncateTriggerCreateCommand =
626 			TruncateTriggerCreateCommand(cacheEntry->relationId);
627 		metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
628 											  truncateTriggerCreateCommand);
629 
630 		/* add the pg_dist_shard{,placement} entries */
631 		List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
632 		List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
633 
634 		metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
635 												  shardCreateCommandList);
636 	}
637 
638 	return metadataSnapshotCommandList;
639 }
640 
641 
642 /*
643  * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to
644  * create the given distributed table on a worker. The list includes setting up any
645  * sequences, setting the owner of the table, inserting table and shard metadata,
646  * setting the truncate trigger and foreign key constraints.
647  */
648 static List *
GetDistributedTableDDLEvents(Oid relationId)649 GetDistributedTableDDLEvents(Oid relationId)
650 {
651 	CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
652 
653 	List *commandList = NIL;
654 	IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
655 
656 	/* if the table is owned by an extension we only propagate pg_dist_* records */
657 	bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
658 	if (!tableOwnedByExtension)
659 	{
660 		/* commands to create sequences */
661 		List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
662 		commandList = list_concat(commandList, sequenceDDLCommands);
663 
664 		/*
665 		 * Commands to create the table, these commands are TableDDLCommands so lets
666 		 * materialize to the non-sharded version
667 		 */
668 		List *tableDDLCommands = GetFullTableCreationCommands(relationId,
669 															  includeSequenceDefaults);
670 		TableDDLCommand *tableDDLCommand = NULL;
671 		foreach_ptr(tableDDLCommand, tableDDLCommands)
672 		{
673 			Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
674 			commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
675 		}
676 
677 		/* command to associate sequences with table */
678 		List *sequenceDependencyCommandList = SequenceDependencyCommandList(
679 			relationId);
680 		commandList = list_concat(commandList, sequenceDependencyCommandList);
681 	}
682 
683 	/* command to insert pg_dist_partition entry */
684 	char *metadataCommand = DistributionCreateCommand(cacheEntry);
685 	commandList = lappend(commandList, metadataCommand);
686 
687 	/* commands to create the truncate trigger of the table */
688 	char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
689 	commandList = lappend(commandList, truncateTriggerCreateCommand);
690 
691 	/* commands to insert pg_dist_shard & pg_dist_placement entries */
692 	List *shardIntervalList = LoadShardIntervalList(relationId);
693 	List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
694 	commandList = list_concat(commandList, shardMetadataInsertCommandList);
695 
696 	if (!tableOwnedByExtension)
697 	{
698 		/* commands to create foreign key constraints */
699 		List *foreignConstraintCommands =
700 			GetReferencingForeignConstaintCommands(relationId);
701 		commandList = list_concat(commandList, foreignConstraintCommands);
702 
703 		/* commands to create partitioning hierarchy */
704 		if (PartitionTable(relationId))
705 		{
706 			char *alterTableAttachPartitionCommands =
707 				GenerateAlterTableAttachPartitionCommand(relationId);
708 			commandList = lappend(commandList, alterTableAttachPartitionCommands);
709 		}
710 	}
711 
712 	return commandList;
713 }
714 
715 
716 /*
717  * MetadataDropCommands returns list of queries that are required to
718  * drop all the metadata of the node that are related to clustered tables.
719  * The drop metadata snapshot commands includes the following queries:
720  *
721  * (i)   Query to disable DDL propagation (necessary for (ii)
722  * (ii)  Queries that DETACH all partitions of distributed tables
723  * (iii) Queries that delete all the rows from pg_dist_node table
724  * (iv)  Queries that drop the clustered tables and remove its references from
725  *        the pg_dist_partition. Note that distributed relation ids are gathered
726  *        from the worker itself to prevent dropping any non-distributed tables
727  *        with the same name.
728  * (v)   Queries that delete all the rows from pg_dist_shard table referenced by (iv)
729  * (vi)  Queries that delete all the rows from pg_dist_placement table
730  *        referenced by (v)
731  */
732 List *
MetadataDropCommands(void)733 MetadataDropCommands(void)
734 {
735 	List *dropSnapshotCommandList = NIL;
736 	List *detachPartitionCommandList = DetachPartitionCommandList();
737 
738 	dropSnapshotCommandList = list_concat(dropSnapshotCommandList,
739 										  detachPartitionCommandList);
740 
741 	dropSnapshotCommandList = lappend(dropSnapshotCommandList,
742 									  REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
743 
744 	dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
745 
746 	return dropSnapshotCommandList;
747 }
748 
749 
750 /*
751  * NodeListInsertCommand generates a single multi-row INSERT command that can be
752  * executed to insert the nodes that are in workerNodeList to pg_dist_node table.
753  */
754 char *
NodeListInsertCommand(List * workerNodeList)755 NodeListInsertCommand(List *workerNodeList)
756 {
757 	StringInfo nodeListInsertCommand = makeStringInfo();
758 	int workerCount = list_length(workerNodeList);
759 	int processedWorkerNodeCount = 0;
760 	Oid primaryRole = PrimaryNodeRoleId();
761 
762 	/* if there are no workers, return NULL */
763 	if (workerCount == 0)
764 	{
765 		return nodeListInsertCommand->data;
766 	}
767 
768 	if (primaryRole == InvalidOid)
769 	{
770 		ereport(ERROR, (errmsg("bad metadata, noderole does not exist"),
771 						errdetail("you should never see this, please submit "
772 								  "a bug report"),
773 						errhint("run ALTER EXTENSION citus UPDATE and try again")));
774 	}
775 
776 	/* generate the query without any values yet */
777 	appendStringInfo(nodeListInsertCommand,
778 					 "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
779 					 "noderack, hasmetadata, metadatasynced, isactive, noderole, "
780 					 "nodecluster, shouldhaveshards) VALUES ");
781 
782 	/* iterate over the worker nodes, add the values */
783 	WorkerNode *workerNode = NULL;
784 	foreach_ptr(workerNode, workerNodeList)
785 	{
786 		char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
787 		char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
788 		char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
789 		char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE";
790 
791 		Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole);
792 		Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum);
793 		char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
794 
795 		appendStringInfo(nodeListInsertCommand,
796 						 "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)",
797 						 workerNode->nodeId,
798 						 workerNode->groupId,
799 						 quote_literal_cstr(workerNode->workerName),
800 						 workerNode->workerPort,
801 						 quote_literal_cstr(workerNode->workerRack),
802 						 hasMetadataString,
803 						 metadataSyncedString,
804 						 isActiveString,
805 						 nodeRoleString,
806 						 quote_literal_cstr(workerNode->nodeCluster),
807 						 shouldHaveShards);
808 
809 		processedWorkerNodeCount++;
810 		if (processedWorkerNodeCount != workerCount)
811 		{
812 			appendStringInfo(nodeListInsertCommand, ",");
813 		}
814 	}
815 
816 	return nodeListInsertCommand->data;
817 }
818 
819 
820 /*
821  * DistributionCreateCommands generates a commands that can be
822  * executed to replicate the metadata for a distributed table.
823  */
824 char *
DistributionCreateCommand(CitusTableCacheEntry * cacheEntry)825 DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
826 {
827 	StringInfo insertDistributionCommand = makeStringInfo();
828 	Oid relationId = cacheEntry->relationId;
829 	char distributionMethod = cacheEntry->partitionMethod;
830 	char *partitionKeyString = cacheEntry->partitionKeyString;
831 	char *qualifiedRelationName =
832 		generate_qualified_relation_name(relationId);
833 	uint32 colocationId = cacheEntry->colocationId;
834 	char replicationModel = cacheEntry->replicationModel;
835 	StringInfo tablePartitionKeyNameString = makeStringInfo();
836 
837 	if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
838 	{
839 		appendStringInfo(tablePartitionKeyNameString, "NULL");
840 	}
841 	else
842 	{
843 		char *partitionKeyColumnName =
844 			ColumnToColumnName(relationId, partitionKeyString);
845 		appendStringInfo(tablePartitionKeyNameString, "%s",
846 						 quote_literal_cstr(partitionKeyColumnName));
847 	}
848 
849 	appendStringInfo(insertDistributionCommand,
850 					 "SELECT citus_internal_add_partition_metadata "
851 					 "(%s::regclass, '%c', %s, %d, '%c')",
852 					 quote_literal_cstr(qualifiedRelationName),
853 					 distributionMethod,
854 					 tablePartitionKeyNameString->data,
855 					 colocationId,
856 					 replicationModel);
857 
858 	return insertDistributionCommand->data;
859 }
860 
861 
862 /*
863  * DistributionDeleteCommand generates a command that can be executed
864  * to drop a distributed table and its metadata on a remote node.
865  */
866 char *
DistributionDeleteCommand(const char * schemaName,const char * tableName)867 DistributionDeleteCommand(const char *schemaName, const char *tableName)
868 {
869 	StringInfo deleteDistributionCommand = makeStringInfo();
870 
871 	char *distributedRelationName = quote_qualified_identifier(schemaName, tableName);
872 
873 	appendStringInfo(deleteDistributionCommand,
874 					 "SELECT worker_drop_distributed_table(%s)",
875 					 quote_literal_cstr(distributedRelationName));
876 
877 	return deleteDistributionCommand->data;
878 }
879 
880 
881 /*
882  * TableOwnerResetCommand generates a commands that can be executed
883  * to reset the table owner.
884  */
885 char *
TableOwnerResetCommand(Oid relationId)886 TableOwnerResetCommand(Oid relationId)
887 {
888 	StringInfo ownerResetCommand = makeStringInfo();
889 	char *qualifiedRelationName = generate_qualified_relation_name(relationId);
890 	char *tableOwnerName = TableOwner(relationId);
891 
892 	appendStringInfo(ownerResetCommand,
893 					 "ALTER TABLE %s OWNER TO %s",
894 					 qualifiedRelationName,
895 					 quote_identifier(tableOwnerName));
896 
897 	return ownerResetCommand->data;
898 }
899 
900 
901 /*
902  * ShardListInsertCommand generates a single command that can be
903  * executed to replicate shard and shard placement metadata for the
904  * given shard intervals. The function assumes that each shard has a
905  * single placement, and asserts this information.
906  */
907 List *
ShardListInsertCommand(List * shardIntervalList)908 ShardListInsertCommand(List *shardIntervalList)
909 {
910 	List *commandList = NIL;
911 	int shardCount = list_length(shardIntervalList);
912 
913 	/* if there are no shards, return empty list */
914 	if (shardCount == 0)
915 	{
916 		return commandList;
917 	}
918 
919 	/* add placements to insertPlacementCommand */
920 	StringInfo insertPlacementCommand = makeStringInfo();
921 	appendStringInfo(insertPlacementCommand,
922 					 "WITH placement_data(shardid, shardstate, "
923 					 "shardlength, groupid, placementid)  AS (VALUES ");
924 
925 	ShardInterval *shardInterval = NULL;
926 	foreach_ptr(shardInterval, shardIntervalList)
927 	{
928 		uint64 shardId = shardInterval->shardId;
929 		List *shardPlacementList = ActiveShardPlacementList(shardId);
930 
931 		ShardPlacement *placement = NULL;
932 		foreach_ptr(placement, shardPlacementList)
933 		{
934 			appendStringInfo(insertPlacementCommand,
935 							 "(%ld, %d, %ld, %d, %ld)",
936 							 shardId,
937 							 placement->shardState,
938 							 placement->shardLength,
939 							 placement->groupId,
940 							 placement->placementId);
941 
942 			if (!(llast(shardPlacementList) == placement &&
943 				  llast(shardIntervalList) == shardInterval))
944 			{
945 				/*
946 				 * As long as this is not the last placement of the last shard,
947 				 * append the comma.
948 				 */
949 				appendStringInfo(insertPlacementCommand, ", ");
950 			}
951 		}
952 	}
953 
954 	appendStringInfo(insertPlacementCommand, ") ");
955 
956 	appendStringInfo(insertPlacementCommand,
957 					 "SELECT citus_internal_add_placement_metadata("
958 					 "shardid, shardstate, shardlength, groupid, placementid) "
959 					 "FROM placement_data;");
960 
961 	/* now add shards to insertShardCommand */
962 	StringInfo insertShardCommand = makeStringInfo();
963 	appendStringInfo(insertShardCommand,
964 					 "WITH shard_data(relationname, shardid, storagetype, "
965 					 "shardminvalue, shardmaxvalue)  AS (VALUES ");
966 
967 	foreach_ptr(shardInterval, shardIntervalList)
968 	{
969 		uint64 shardId = shardInterval->shardId;
970 		Oid distributedRelationId = shardInterval->relationId;
971 		char *qualifiedRelationName = generate_qualified_relation_name(
972 			distributedRelationId);
973 		StringInfo minHashToken = makeStringInfo();
974 		StringInfo maxHashToken = makeStringInfo();
975 
976 		if (shardInterval->minValueExists)
977 		{
978 			appendStringInfo(minHashToken, "'%d'", DatumGetInt32(
979 								 shardInterval->minValue));
980 		}
981 		else
982 		{
983 			appendStringInfo(minHashToken, "NULL");
984 		}
985 
986 		if (shardInterval->maxValueExists)
987 		{
988 			appendStringInfo(maxHashToken, "'%d'", DatumGetInt32(
989 								 shardInterval->maxValue));
990 		}
991 		else
992 		{
993 			appendStringInfo(maxHashToken, "NULL");
994 		}
995 
996 		appendStringInfo(insertShardCommand,
997 						 "(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
998 						 quote_literal_cstr(qualifiedRelationName),
999 						 shardId,
1000 						 shardInterval->storageType,
1001 						 minHashToken->data,
1002 						 maxHashToken->data);
1003 
1004 		if (llast(shardIntervalList) != shardInterval)
1005 		{
1006 			appendStringInfo(insertShardCommand, ", ");
1007 		}
1008 	}
1009 
1010 	appendStringInfo(insertShardCommand, ") ");
1011 
1012 	appendStringInfo(insertShardCommand,
1013 					 "SELECT citus_internal_add_shard_metadata(relationname, shardid, "
1014 					 "storagetype, shardminvalue, shardmaxvalue) "
1015 					 "FROM shard_data;");
1016 
1017 	/* first insert shards, than the placements */
1018 	commandList = lappend(commandList, insertShardCommand->data);
1019 	commandList = lappend(commandList, insertPlacementCommand->data);
1020 
1021 	return commandList;
1022 }
1023 
1024 
1025 /*
1026  * NodeDeleteCommand generate a command that can be
1027  * executed to delete the metadata for a worker node.
1028  */
1029 char *
NodeDeleteCommand(uint32 nodeId)1030 NodeDeleteCommand(uint32 nodeId)
1031 {
1032 	StringInfo nodeDeleteCommand = makeStringInfo();
1033 
1034 	appendStringInfo(nodeDeleteCommand,
1035 					 "DELETE FROM pg_dist_node "
1036 					 "WHERE nodeid = %u", nodeId);
1037 
1038 	return nodeDeleteCommand->data;
1039 }
1040 
1041 
1042 /*
1043  * NodeStateUpdateCommand generates a command that can be executed to update
1044  * isactive column of a node in pg_dist_node table.
1045  */
1046 char *
NodeStateUpdateCommand(uint32 nodeId,bool isActive)1047 NodeStateUpdateCommand(uint32 nodeId, bool isActive)
1048 {
1049 	StringInfo nodeStateUpdateCommand = makeStringInfo();
1050 	char *isActiveString = isActive ? "TRUE" : "FALSE";
1051 
1052 	appendStringInfo(nodeStateUpdateCommand,
1053 					 "UPDATE pg_dist_node SET isactive = %s "
1054 					 "WHERE nodeid = %u", isActiveString, nodeId);
1055 
1056 	return nodeStateUpdateCommand->data;
1057 }
1058 
1059 
1060 /*
1061  * ShouldHaveShardsUpdateCommand generates a command that can be executed to
1062  * update the shouldhaveshards column of a node in pg_dist_node table.
1063  */
1064 char *
ShouldHaveShardsUpdateCommand(uint32 nodeId,bool shouldHaveShards)1065 ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards)
1066 {
1067 	StringInfo nodeStateUpdateCommand = makeStringInfo();
1068 	char *shouldHaveShardsString = shouldHaveShards ? "TRUE" : "FALSE";
1069 
1070 	appendStringInfo(nodeStateUpdateCommand,
1071 					 "UPDATE pg_catalog.pg_dist_node SET shouldhaveshards = %s "
1072 					 "WHERE nodeid = %u", shouldHaveShardsString, nodeId);
1073 
1074 	return nodeStateUpdateCommand->data;
1075 }
1076 
1077 
1078 /*
1079  * ColocationIdUpdateCommand creates the SQL command to change the colocationId
1080  * of the table with the given name to the given colocationId in pg_dist_partition
1081  * table.
1082  */
1083 char *
ColocationIdUpdateCommand(Oid relationId,uint32 colocationId)1084 ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
1085 {
1086 	StringInfo command = makeStringInfo();
1087 	char *qualifiedRelationName = generate_qualified_relation_name(relationId);
1088 	appendStringInfo(command,
1089 					 "SELECT citus_internal_update_relation_colocation(%s::regclass, %d)",
1090 					 quote_literal_cstr(qualifiedRelationName), colocationId);
1091 
1092 	return command->data;
1093 }
1094 
1095 
1096 /*
1097  * PlacementUpsertCommand creates a SQL command for upserting a pg_dist_placment
1098  * entry with the given properties. In the case of a conflict on placementId, the command
1099  * updates all properties (excluding the placementId) with the given ones.
1100  */
1101 char *
PlacementUpsertCommand(uint64 shardId,uint64 placementId,int shardState,uint64 shardLength,int32 groupId)1102 PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
1103 					   uint64 shardLength, int32 groupId)
1104 {
1105 	StringInfo command = makeStringInfo();
1106 
1107 	appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardState, shardLength,
1108 					 groupId, placementId);
1109 
1110 	return command->data;
1111 }
1112 
1113 
1114 /*
1115  * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
1116  * of a worker and returns the command in a string.
1117  */
1118 static char *
LocalGroupIdUpdateCommand(int32 groupId)1119 LocalGroupIdUpdateCommand(int32 groupId)
1120 {
1121 	StringInfo updateCommand = makeStringInfo();
1122 
1123 	appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d",
1124 					 groupId);
1125 
1126 	return updateCommand->data;
1127 }
1128 
1129 
1130 /*
1131  * SequenceDDLCommandsForTable returns a list of commands which create sequences (and
1132  * their schemas) to run on workers before creating the relation. The sequence creation
1133  * commands are wrapped with a `worker_apply_sequence_command` call, which sets the
1134  * sequence space uniquely for each worker. Notice that this function is relevant only
1135  * during metadata propagation to workers and adds nothing to the list of sequence
1136  * commands if none of the workers is marked as receiving metadata changes.
1137  */
1138 List *
SequenceDDLCommandsForTable(Oid relationId)1139 SequenceDDLCommandsForTable(Oid relationId)
1140 {
1141 	List *sequenceDDLList = NIL;
1142 
1143 	List *attnumList = NIL;
1144 	List *dependentSequenceList = NIL;
1145 	GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
1146 
1147 	char *ownerName = TableOwner(relationId);
1148 
1149 	Oid sequenceOid = InvalidOid;
1150 	foreach_oid(sequenceOid, dependentSequenceList)
1151 	{
1152 		char *sequenceDef = pg_get_sequencedef_string(sequenceOid);
1153 		char *escapedSequenceDef = quote_literal_cstr(sequenceDef);
1154 		StringInfo wrappedSequenceDef = makeStringInfo();
1155 		StringInfo sequenceGrantStmt = makeStringInfo();
1156 		char *sequenceName = generate_qualified_relation_name(sequenceOid);
1157 		Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceOid);
1158 		Oid sequenceTypeOid = sequenceData->seqtypid;
1159 		char *typeName = format_type_be(sequenceTypeOid);
1160 
1161 		/* create schema if needed */
1162 		appendStringInfo(wrappedSequenceDef,
1163 						 WORKER_APPLY_SEQUENCE_COMMAND,
1164 						 escapedSequenceDef,
1165 						 quote_literal_cstr(typeName));
1166 
1167 		appendStringInfo(sequenceGrantStmt,
1168 						 "ALTER SEQUENCE %s OWNER TO %s", sequenceName,
1169 						 quote_identifier(ownerName));
1170 
1171 		sequenceDDLList = lappend(sequenceDDLList, wrappedSequenceDef->data);
1172 		sequenceDDLList = lappend(sequenceDDLList, sequenceGrantStmt->data);
1173 	}
1174 
1175 	return sequenceDDLList;
1176 }
1177 
1178 
1179 /*
1180  * GetAttributeTypeOid returns the OID of the type of the attribute of
1181  * provided relationId that has the provided attnum
1182  */
1183 Oid
GetAttributeTypeOid(Oid relationId,AttrNumber attnum)1184 GetAttributeTypeOid(Oid relationId, AttrNumber attnum)
1185 {
1186 	Oid resultOid = InvalidOid;
1187 
1188 	ScanKeyData key[2];
1189 
1190 	/* Grab an appropriate lock on the pg_attribute relation */
1191 	Relation attrel = table_open(AttributeRelationId, AccessShareLock);
1192 
1193 	/* Use the index to scan only system attributes of the target relation */
1194 	ScanKeyInit(&key[0],
1195 				Anum_pg_attribute_attrelid,
1196 				BTEqualStrategyNumber, F_OIDEQ,
1197 				ObjectIdGetDatum(relationId));
1198 	ScanKeyInit(&key[1],
1199 				Anum_pg_attribute_attnum,
1200 				BTLessEqualStrategyNumber, F_INT2LE,
1201 				Int16GetDatum(attnum));
1202 
1203 	SysScanDesc scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true, NULL, 2,
1204 										  key);
1205 
1206 	HeapTuple attributeTuple;
1207 	while (HeapTupleIsValid(attributeTuple = systable_getnext(scan)))
1208 	{
1209 		Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
1210 		resultOid = att->atttypid;
1211 	}
1212 
1213 	systable_endscan(scan);
1214 	table_close(attrel, AccessShareLock);
1215 
1216 	return resultOid;
1217 }
1218 
1219 
1220 /*
1221  * GetDependentSequencesWithRelation appends the attnum and id of sequences that
1222  * have direct (owned sequences) or indirect dependency with the given relationId,
1223  * to the lists passed as NIL initially.
1224  * For both cases, we use the intermediate AttrDefault object from pg_depend.
1225  * If attnum is specified, we only return the sequences related to that
1226  * attribute of the relationId.
1227  */
1228 void
GetDependentSequencesWithRelation(Oid relationId,List ** attnumList,List ** dependentSequenceList,AttrNumber attnum)1229 GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
1230 								  List **dependentSequenceList, AttrNumber attnum)
1231 {
1232 	Assert(*attnumList == NIL && *dependentSequenceList == NIL);
1233 
1234 	List *attrdefResult = NIL;
1235 	List *attrdefAttnumResult = NIL;
1236 	ScanKeyData key[3];
1237 	HeapTuple tup;
1238 
1239 	Relation depRel = table_open(DependRelationId, AccessShareLock);
1240 
1241 	ScanKeyInit(&key[0],
1242 				Anum_pg_depend_refclassid,
1243 				BTEqualStrategyNumber, F_OIDEQ,
1244 				ObjectIdGetDatum(RelationRelationId));
1245 	ScanKeyInit(&key[1],
1246 				Anum_pg_depend_refobjid,
1247 				BTEqualStrategyNumber, F_OIDEQ,
1248 				ObjectIdGetDatum(relationId));
1249 	if (attnum)
1250 	{
1251 		ScanKeyInit(&key[2],
1252 					Anum_pg_depend_refobjsubid,
1253 					BTEqualStrategyNumber, F_INT4EQ,
1254 					Int32GetDatum(attnum));
1255 	}
1256 
1257 	SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
1258 										  NULL, attnum ? 3 : 2, key);
1259 
1260 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
1261 	{
1262 		Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
1263 
1264 		if (deprec->classid == AttrDefaultRelationId &&
1265 			deprec->objsubid == 0 &&
1266 			deprec->refobjsubid != 0 &&
1267 			deprec->deptype == DEPENDENCY_AUTO)
1268 		{
1269 			attrdefResult = lappend_oid(attrdefResult, deprec->objid);
1270 			attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid);
1271 		}
1272 	}
1273 
1274 	systable_endscan(scan);
1275 
1276 	table_close(depRel, AccessShareLock);
1277 
1278 	ListCell *attrdefOidCell = NULL;
1279 	ListCell *attrdefAttnumCell = NULL;
1280 	forboth(attrdefOidCell, attrdefResult, attrdefAttnumCell, attrdefAttnumResult)
1281 	{
1282 		Oid attrdefOid = lfirst_oid(attrdefOidCell);
1283 		AttrNumber attrdefAttnum = lfirst_int(attrdefAttnumCell);
1284 
1285 		List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid);
1286 
1287 		/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
1288 		if (list_length(sequencesFromAttrDef) > 1)
1289 		{
1290 			ereport(ERROR, (errmsg(
1291 								"More than one sequence in a column default"
1292 								" is not supported for distribution "
1293 								"or for adding local tables to metadata")));
1294 		}
1295 
1296 		if (list_length(sequencesFromAttrDef) == 1)
1297 		{
1298 			*dependentSequenceList = list_concat(*dependentSequenceList,
1299 												 sequencesFromAttrDef);
1300 			*attnumList = lappend_int(*attnumList, attrdefAttnum);
1301 		}
1302 	}
1303 }
1304 
1305 
1306 /*
1307  * GetSequencesFromAttrDef returns a list of sequence OIDs that have
1308  * dependency with the given attrdefOid in pg_depend
1309  */
1310 List *
GetSequencesFromAttrDef(Oid attrdefOid)1311 GetSequencesFromAttrDef(Oid attrdefOid)
1312 {
1313 	List *sequencesResult = NIL;
1314 	ScanKeyData key[2];
1315 	HeapTuple tup;
1316 
1317 	Relation depRel = table_open(DependRelationId, AccessShareLock);
1318 
1319 	ScanKeyInit(&key[0],
1320 				Anum_pg_depend_classid,
1321 				BTEqualStrategyNumber, F_OIDEQ,
1322 				ObjectIdGetDatum(AttrDefaultRelationId));
1323 	ScanKeyInit(&key[1],
1324 				Anum_pg_depend_objid,
1325 				BTEqualStrategyNumber, F_OIDEQ,
1326 				ObjectIdGetDatum(attrdefOid));
1327 
1328 	SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true,
1329 										  NULL, 2, key);
1330 
1331 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
1332 	{
1333 		Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
1334 
1335 		if (deprec->refclassid == RelationRelationId &&
1336 			deprec->deptype == DEPENDENCY_NORMAL &&
1337 			get_rel_relkind(deprec->refobjid) == RELKIND_SEQUENCE)
1338 		{
1339 			sequencesResult = lappend_oid(sequencesResult, deprec->refobjid);
1340 		}
1341 	}
1342 
1343 	systable_endscan(scan);
1344 
1345 	table_close(depRel, AccessShareLock);
1346 
1347 	return sequencesResult;
1348 }
1349 
1350 
1351 /*
1352  * SequenceDependencyCommandList generates commands to record the dependency
1353  * of sequences on tables on the worker. This dependency does not exist by
1354  * default since the sequences and table are created separately, but it is
1355  * necessary to ensure that the sequence is dropped when the table is
1356  * dropped.
1357  */
1358 static List *
SequenceDependencyCommandList(Oid relationId)1359 SequenceDependencyCommandList(Oid relationId)
1360 {
1361 	List *sequenceCommandList = NIL;
1362 	List *columnNameList = NIL;
1363 	List *sequenceIdList = NIL;
1364 
1365 	ExtractDefaultColumnsAndOwnedSequences(relationId, &columnNameList, &sequenceIdList);
1366 
1367 	ListCell *columnNameCell = NULL;
1368 	ListCell *sequenceIdCell = NULL;
1369 
1370 	forboth(columnNameCell, columnNameList, sequenceIdCell, sequenceIdList)
1371 	{
1372 		char *columnName = lfirst(columnNameCell);
1373 		Oid sequenceId = lfirst_oid(sequenceIdCell);
1374 
1375 		if (!OidIsValid(sequenceId))
1376 		{
1377 			/*
1378 			 * ExtractDefaultColumnsAndOwnedSequences returns entries for all columns,
1379 			 * but with 0 sequence ID unless there is default nextval(..).
1380 			 */
1381 			continue;
1382 		}
1383 
1384 		char *sequenceDependencyCommand =
1385 			CreateSequenceDependencyCommand(relationId, sequenceId, columnName);
1386 
1387 		sequenceCommandList = lappend(sequenceCommandList,
1388 									  sequenceDependencyCommand);
1389 	}
1390 
1391 	return sequenceCommandList;
1392 }
1393 
1394 
1395 /*
1396  * CreateSequenceDependencyCommand generates a query string for calling
1397  * worker_record_sequence_dependency on the worker to recreate a sequence->table
1398  * dependency.
1399  */
1400 static char *
CreateSequenceDependencyCommand(Oid relationId,Oid sequenceId,char * columnName)1401 CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, char *columnName)
1402 {
1403 	char *relationName = generate_qualified_relation_name(relationId);
1404 	char *sequenceName = generate_qualified_relation_name(sequenceId);
1405 
1406 	StringInfo sequenceDependencyCommand = makeStringInfo();
1407 
1408 	appendStringInfo(sequenceDependencyCommand,
1409 					 "SELECT pg_catalog.worker_record_sequence_dependency"
1410 					 "(%s::regclass,%s::regclass,%s)",
1411 					 quote_literal_cstr(sequenceName),
1412 					 quote_literal_cstr(relationName),
1413 					 quote_literal_cstr(columnName));
1414 
1415 	return sequenceDependencyCommand->data;
1416 }
1417 
1418 
1419 /*
1420  * worker_record_sequence_dependency records the fact that the sequence depends on
1421  * the table in pg_depend, such that it will be automatically dropped.
1422  */
1423 Datum
worker_record_sequence_dependency(PG_FUNCTION_ARGS)1424 worker_record_sequence_dependency(PG_FUNCTION_ARGS)
1425 {
1426 	Oid sequenceOid = PG_GETARG_OID(0);
1427 	Oid relationOid = PG_GETARG_OID(1);
1428 	Name columnName = PG_GETARG_NAME(2);
1429 	const char *columnNameStr = NameStr(*columnName);
1430 
1431 	/* lookup column definition */
1432 	HeapTuple columnTuple = SearchSysCacheAttName(relationOid, columnNameStr);
1433 	if (!HeapTupleIsValid(columnTuple))
1434 	{
1435 		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
1436 						errmsg("column \"%s\" does not exist",
1437 							   columnNameStr)));
1438 	}
1439 
1440 	Form_pg_attribute columnForm = (Form_pg_attribute) GETSTRUCT(columnTuple);
1441 	if (columnForm->attnum <= 0)
1442 	{
1443 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1444 						errmsg("cannot create dependency on system column \"%s\"",
1445 							   columnNameStr)));
1446 	}
1447 
1448 	ObjectAddress sequenceAddr = {
1449 		.classId = RelationRelationId,
1450 		.objectId = sequenceOid,
1451 		.objectSubId = 0
1452 	};
1453 	ObjectAddress relationAddr = {
1454 		.classId = RelationRelationId,
1455 		.objectId = relationOid,
1456 		.objectSubId = columnForm->attnum
1457 	};
1458 
1459 
1460 	EnsureTableOwner(sequenceOid);
1461 	EnsureTableOwner(relationOid);
1462 
1463 	/* dependency from sequence to table */
1464 	recordDependencyOn(&sequenceAddr, &relationAddr, DEPENDENCY_AUTO);
1465 
1466 	ReleaseSysCache(columnTuple);
1467 
1468 	PG_RETURN_VOID();
1469 }
1470 
1471 
1472 /*
1473  * CreateSchemaDDLCommand returns a "CREATE SCHEMA..." SQL string for creating the given
1474  * schema if not exists and with proper authorization.
1475  */
1476 char *
CreateSchemaDDLCommand(Oid schemaId)1477 CreateSchemaDDLCommand(Oid schemaId)
1478 {
1479 	char *schemaName = get_namespace_name(schemaId);
1480 
1481 	StringInfo schemaNameDef = makeStringInfo();
1482 	const char *quotedSchemaName = quote_identifier(schemaName);
1483 	const char *ownerName = quote_identifier(SchemaOwnerName(schemaId));
1484 	appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, quotedSchemaName, ownerName);
1485 
1486 	return schemaNameDef->data;
1487 }
1488 
1489 
1490 /*
1491  * GrantOnSchemaDDLCommands creates a list of ddl command for replicating the permissions
1492  * of roles on schemas.
1493  */
1494 List *
GrantOnSchemaDDLCommands(Oid schemaOid)1495 GrantOnSchemaDDLCommands(Oid schemaOid)
1496 {
1497 	HeapTuple schemaTuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(schemaOid));
1498 	bool isNull = true;
1499 	Datum aclDatum = SysCacheGetAttr(NAMESPACEOID, schemaTuple, Anum_pg_namespace_nspacl,
1500 									 &isNull);
1501 	if (isNull)
1502 	{
1503 		ReleaseSysCache(schemaTuple);
1504 		return NIL;
1505 	}
1506 	Acl *acl = DatumGetAclPCopy(aclDatum);
1507 	AclItem *aclDat = ACL_DAT(acl);
1508 	int aclNum = ACL_NUM(acl);
1509 	List *commands = NIL;
1510 
1511 	ReleaseSysCache(schemaTuple);
1512 
1513 	for (int i = 0; i < aclNum; i++)
1514 	{
1515 		commands = list_concat(commands,
1516 							   GenerateGrantOnSchemaQueriesFromAclItem(
1517 								   schemaOid,
1518 								   &aclDat[i]));
1519 	}
1520 
1521 	return commands;
1522 }
1523 
1524 
1525 /*
1526  * GenerateGrantOnSchemaQueryFromACL generates a query string for replicating a users permissions
1527  * on a schema.
1528  */
1529 List *
GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid,AclItem * aclItem)1530 GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem)
1531 {
1532 	AclMode permissions = ACLITEM_GET_PRIVS(*aclItem) & ACL_ALL_RIGHTS_SCHEMA;
1533 	AclMode grants = ACLITEM_GET_GOPTIONS(*aclItem) & ACL_ALL_RIGHTS_SCHEMA;
1534 
1535 	/*
1536 	 * seems unlikely but we check if there is a grant option in the list without the actual permission
1537 	 */
1538 	Assert(!(grants & ACL_USAGE) || (permissions & ACL_USAGE));
1539 	Assert(!(grants & ACL_CREATE) || (permissions & ACL_CREATE));
1540 	Oid granteeOid = aclItem->ai_grantee;
1541 	List *queries = NIL;
1542 
1543 	queries = lappend(queries, GenerateSetRoleQuery(aclItem->ai_grantor));
1544 
1545 	if (permissions & ACL_USAGE)
1546 	{
1547 		char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights(
1548 										  granteeOid, schemaOid, "USAGE", grants &
1549 										  ACL_USAGE));
1550 		queries = lappend(queries, query);
1551 	}
1552 	if (permissions & ACL_CREATE)
1553 	{
1554 		char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights(
1555 										  granteeOid, schemaOid, "CREATE", grants &
1556 										  ACL_CREATE));
1557 		queries = lappend(queries, query);
1558 	}
1559 
1560 	queries = lappend(queries, "RESET ROLE");
1561 
1562 	return queries;
1563 }
1564 
1565 
1566 GrantStmt *
GenerateGrantOnSchemaStmtForRights(Oid roleOid,Oid schemaOid,char * permission,bool withGrantOption)1567 GenerateGrantOnSchemaStmtForRights(Oid roleOid,
1568 								   Oid schemaOid,
1569 								   char *permission,
1570 								   bool withGrantOption)
1571 {
1572 	AccessPriv *accessPriv = makeNode(AccessPriv);
1573 	accessPriv->priv_name = permission;
1574 	accessPriv->cols = NULL;
1575 
1576 	RoleSpec *roleSpec = makeNode(RoleSpec);
1577 	roleSpec->roletype = OidIsValid(roleOid) ? ROLESPEC_CSTRING : ROLESPEC_PUBLIC;
1578 	roleSpec->rolename = OidIsValid(roleOid) ? GetUserNameFromId(roleOid, false) : NULL;
1579 	roleSpec->location = -1;
1580 
1581 	GrantStmt *stmt = makeNode(GrantStmt);
1582 	stmt->is_grant = true;
1583 	stmt->targtype = ACL_TARGET_OBJECT;
1584 	stmt->objtype = OBJECT_SCHEMA;
1585 	stmt->objects = list_make1(makeString(get_namespace_name(schemaOid)));
1586 	stmt->privileges = list_make1(accessPriv);
1587 	stmt->grantees = list_make1(roleSpec);
1588 	stmt->grant_option = withGrantOption;
1589 	return stmt;
1590 }
1591 
1592 
1593 static char *
GenerateSetRoleQuery(Oid roleOid)1594 GenerateSetRoleQuery(Oid roleOid)
1595 {
1596 	StringInfo buf = makeStringInfo();
1597 	appendStringInfo(buf, "SET ROLE %s", quote_identifier(GetUserNameFromId(roleOid,
1598 																			false)));
1599 	return buf->data;
1600 }
1601 
1602 
1603 /*
1604  * TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger
1605  * function, which creates the truncate trigger on the worker.
1606  */
1607 static char *
TruncateTriggerCreateCommand(Oid relationId)1608 TruncateTriggerCreateCommand(Oid relationId)
1609 {
1610 	StringInfo triggerCreateCommand = makeStringInfo();
1611 	char *tableName = generate_qualified_relation_name(relationId);
1612 
1613 	appendStringInfo(triggerCreateCommand,
1614 					 "SELECT worker_create_truncate_trigger(%s)",
1615 					 quote_literal_cstr(tableName));
1616 
1617 	return triggerCreateCommand->data;
1618 }
1619 
1620 
1621 /*
1622  * SchemaOwnerName returns the name of the owner of the specified schema.
1623  */
1624 static char *
SchemaOwnerName(Oid objectId)1625 SchemaOwnerName(Oid objectId)
1626 {
1627 	Oid ownerId = InvalidOid;
1628 
1629 	HeapTuple tuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(objectId));
1630 	if (HeapTupleIsValid(tuple))
1631 	{
1632 		ownerId = ((Form_pg_namespace) GETSTRUCT(tuple))->nspowner;
1633 	}
1634 	else
1635 	{
1636 		ownerId = GetUserId();
1637 	}
1638 
1639 	char *ownerName = GetUserNameFromId(ownerId, false);
1640 
1641 	ReleaseSysCache(tuple);
1642 
1643 	return ownerName;
1644 }
1645 
1646 
1647 /*
1648  * HasMetadataWorkers returns true if any of the workers in the cluster has its
1649  * hasmetadata column set to true, which happens when start_metadata_sync_to_node
1650  * command is run.
1651  */
1652 static bool
HasMetadataWorkers(void)1653 HasMetadataWorkers(void)
1654 {
1655 	List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
1656 
1657 	WorkerNode *workerNode = NULL;
1658 	foreach_ptr(workerNode, workerNodeList)
1659 	{
1660 		if (workerNode->hasMetadata)
1661 		{
1662 			return true;
1663 		}
1664 	}
1665 
1666 	return false;
1667 }
1668 
1669 
1670 /*
1671  * CreateTableMetadataOnWorkers creates the list of commands needed to create the
1672  * given distributed table and sends these commands to all metadata workers i.e. workers
1673  * with hasmetadata=true. Before sending the commands, in order to prevent recursive
1674  * propagation, DDL propagation on workers are disabled with a
1675  * `SET citus.enable_ddl_propagation TO off;` command.
1676  */
1677 void
CreateTableMetadataOnWorkers(Oid relationId)1678 CreateTableMetadataOnWorkers(Oid relationId)
1679 {
1680 	List *commandList = GetDistributedTableDDLEvents(relationId);
1681 
1682 	/* prevent recursive propagation */
1683 	SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
1684 
1685 	/* send the commands one by one */
1686 	const char *command = NULL;
1687 	foreach_ptr(command, commandList)
1688 	{
1689 		SendCommandToWorkersWithMetadata(command);
1690 	}
1691 }
1692 
1693 
1694 /*
1695  * DetachPartitionCommandList returns list of DETACH commands to detach partitions
1696  * of all distributed tables. This function is used for detaching partitions in MX
1697  * workers before DROPping distributed partitioned tables in them. Thus, we are
1698  * disabling DDL propagation to the beginning of the commands (we are also enabling
1699  * DDL propagation at the end of command list to swtich back to original state). As
1700  * an extra step, if there are no partitions to DETACH, this function simply returns
1701  * empty list to not disable/enable DDL propagation for nothing.
1702  */
1703 static List *
DetachPartitionCommandList(void)1704 DetachPartitionCommandList(void)
1705 {
1706 	List *detachPartitionCommandList = NIL;
1707 	List *distributedTableList = CitusTableList();
1708 
1709 	/* we iterate over all distributed partitioned tables and DETACH their partitions */
1710 	CitusTableCacheEntry *cacheEntry = NULL;
1711 	foreach_ptr(cacheEntry, distributedTableList)
1712 	{
1713 		if (!PartitionedTable(cacheEntry->relationId))
1714 		{
1715 			continue;
1716 		}
1717 
1718 		List *partitionList = PartitionList(cacheEntry->relationId);
1719 		Oid partitionRelationId = InvalidOid;
1720 		foreach_oid(partitionRelationId, partitionList)
1721 		{
1722 			char *detachPartitionCommand =
1723 				GenerateDetachPartitionCommand(partitionRelationId);
1724 
1725 			detachPartitionCommandList = lappend(detachPartitionCommandList,
1726 												 detachPartitionCommand);
1727 		}
1728 	}
1729 
1730 	if (list_length(detachPartitionCommandList) == 0)
1731 	{
1732 		return NIL;
1733 	}
1734 
1735 	detachPartitionCommandList =
1736 		lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList);
1737 
1738 	/*
1739 	 * We probably do not need this but as an extra precaution, we are enabling
1740 	 * DDL propagation to switch back to original state.
1741 	 */
1742 	detachPartitionCommandList = lappend(detachPartitionCommandList,
1743 										 ENABLE_DDL_PROPAGATION);
1744 
1745 	return detachPartitionCommandList;
1746 }
1747 
1748 
1749 /*
1750  * SyncMetadataToNodes tries recreating the metadata snapshot in the
1751  * metadata workers that are out of sync. Returns the result of
1752  * synchronization.
1753  */
1754 static MetadataSyncResult
SyncMetadataToNodes(void)1755 SyncMetadataToNodes(void)
1756 {
1757 	MetadataSyncResult result = METADATA_SYNC_SUCCESS;
1758 
1759 	if (!IsCoordinator())
1760 	{
1761 		return METADATA_SYNC_SUCCESS;
1762 	}
1763 
1764 	/*
1765 	 * Request a RowExclusiveLock so we don't run concurrently with other
1766 	 * functions updating pg_dist_node, but allow concurrency with functions
1767 	 * which are just reading from pg_dist_node.
1768 	 */
1769 	if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock))
1770 	{
1771 		return METADATA_SYNC_FAILED_LOCK;
1772 	}
1773 
1774 	List *syncedWorkerList = NIL;
1775 	List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
1776 	WorkerNode *workerNode = NULL;
1777 	foreach_ptr(workerNode, workerList)
1778 	{
1779 		if (workerNode->hasMetadata && !workerNode->metadataSynced)
1780 		{
1781 			bool raiseInterrupts = false;
1782 			if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
1783 			{
1784 				ereport(WARNING, (errmsg("failed to sync metadata to %s:%d",
1785 										 workerNode->workerName,
1786 										 workerNode->workerPort)));
1787 				result = METADATA_SYNC_FAILED_SYNC;
1788 			}
1789 			else
1790 			{
1791 				/* we add successfully synced nodes to set metadatasynced column later */
1792 				syncedWorkerList = lappend(syncedWorkerList, workerNode);
1793 			}
1794 		}
1795 	}
1796 
1797 	foreach_ptr(workerNode, syncedWorkerList)
1798 	{
1799 		SetWorkerColumnOptional(workerNode, Anum_pg_dist_node_metadatasynced,
1800 								BoolGetDatum(true));
1801 
1802 		/* we fetch the same node again to check if it's synced or not */
1803 		WorkerNode *nodeUpdated = FindWorkerNode(workerNode->workerName,
1804 												 workerNode->workerPort);
1805 		if (!nodeUpdated->metadataSynced)
1806 		{
1807 			/* set the result to FAILED to trigger the sync again */
1808 			result = METADATA_SYNC_FAILED_SYNC;
1809 		}
1810 	}
1811 
1812 	return result;
1813 }
1814 
1815 
1816 /*
1817  * SyncMetadataToNodesMain is the main function for syncing metadata to
1818  * MX nodes. It retries until success and then exits.
1819  */
1820 void
SyncMetadataToNodesMain(Datum main_arg)1821 SyncMetadataToNodesMain(Datum main_arg)
1822 {
1823 	Oid databaseOid = DatumGetObjectId(main_arg);
1824 
1825 	/* extension owner is passed via bgw_extra */
1826 	Oid extensionOwner = InvalidOid;
1827 	memcpy_s(&extensionOwner, sizeof(extensionOwner),
1828 			 MyBgworkerEntry->bgw_extra, sizeof(Oid));
1829 
1830 	pqsignal(SIGTERM, MetadataSyncSigTermHandler);
1831 	pqsignal(SIGALRM, MetadataSyncSigAlrmHandler);
1832 	BackgroundWorkerUnblockSignals();
1833 
1834 	/* connect to database, after that we can actually access catalogs */
1835 	BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
1836 
1837 	/* make worker recognizable in pg_stat_activity */
1838 	pgstat_report_appname(METADATA_SYNC_APP_NAME);
1839 
1840 	bool syncedAllNodes = false;
1841 
1842 	while (!syncedAllNodes)
1843 	{
1844 		InvalidateMetadataSystemCache();
1845 		StartTransactionCommand();
1846 
1847 		/*
1848 		 * Some functions in ruleutils.c, which we use to get the DDL for
1849 		 * metadata propagation, require an active snapshot.
1850 		 */
1851 		PushActiveSnapshot(GetTransactionSnapshot());
1852 
1853 		if (!LockCitusExtension())
1854 		{
1855 			ereport(DEBUG1, (errmsg("could not lock the citus extension, "
1856 									"skipping metadata sync")));
1857 		}
1858 		else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
1859 		{
1860 			UseCoordinatedTransaction();
1861 			MetadataSyncResult result = SyncMetadataToNodes();
1862 
1863 			syncedAllNodes = (result == METADATA_SYNC_SUCCESS);
1864 
1865 			/* we use LISTEN/NOTIFY to wait for metadata syncing in tests */
1866 			if (result != METADATA_SYNC_FAILED_LOCK)
1867 			{
1868 				Async_Notify(METADATA_SYNC_CHANNEL, NULL);
1869 			}
1870 		}
1871 
1872 		PopActiveSnapshot();
1873 		CommitTransactionCommand();
1874 		ProcessCompletedNotifies();
1875 
1876 		if (syncedAllNodes)
1877 		{
1878 			break;
1879 		}
1880 
1881 		/*
1882 		 * If backend is cancelled (e.g. bacause of distributed deadlock),
1883 		 * CHECK_FOR_INTERRUPTS() will raise a cancellation error which will
1884 		 * result in exit(1).
1885 		 */
1886 		CHECK_FOR_INTERRUPTS();
1887 
1888 		/*
1889 		 * SIGTERM is used for when maintenance daemon tries to clean-up
1890 		 * metadata sync daemons spawned by terminated maintenance daemons.
1891 		 */
1892 		if (got_SIGTERM)
1893 		{
1894 			exit(0);
1895 		}
1896 
1897 		/*
1898 		 * SIGALRM is used for testing purposes and it simulates an error in metadata
1899 		 * sync daemon.
1900 		 */
1901 		if (got_SIGALRM)
1902 		{
1903 			elog(ERROR, "Error in metadata sync daemon");
1904 		}
1905 
1906 		pg_usleep(MetadataSyncRetryInterval * 1000);
1907 	}
1908 }
1909 
1910 
1911 /*
1912  * MetadataSyncSigTermHandler set a flag to request termination of metadata
1913  * sync daemon.
1914  */
1915 static void
MetadataSyncSigTermHandler(SIGNAL_ARGS)1916 MetadataSyncSigTermHandler(SIGNAL_ARGS)
1917 {
1918 	int save_errno = errno;
1919 
1920 	got_SIGTERM = true;
1921 	if (MyProc != NULL)
1922 	{
1923 		SetLatch(&MyProc->procLatch);
1924 	}
1925 
1926 	errno = save_errno;
1927 }
1928 
1929 
1930 /*
1931  * MetadataSyncSigAlrmHandler set a flag to request error at metadata
1932  * sync daemon. This is used for testing purposes.
1933  */
1934 static void
MetadataSyncSigAlrmHandler(SIGNAL_ARGS)1935 MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
1936 {
1937 	int save_errno = errno;
1938 
1939 	got_SIGALRM = true;
1940 	if (MyProc != NULL)
1941 	{
1942 		SetLatch(&MyProc->procLatch);
1943 	}
1944 
1945 	errno = save_errno;
1946 }
1947 
1948 
1949 /*
1950  * SpawnSyncMetadataToNodes starts a background worker which runs metadata
1951  * sync. On success it returns workers' handle. Otherwise it returns NULL.
1952  */
1953 BackgroundWorkerHandle *
SpawnSyncMetadataToNodes(Oid database,Oid extensionOwner)1954 SpawnSyncMetadataToNodes(Oid database, Oid extensionOwner)
1955 {
1956 	BackgroundWorker worker;
1957 	BackgroundWorkerHandle *handle = NULL;
1958 
1959 	/* Configure a worker. */
1960 	memset(&worker, 0, sizeof(worker));
1961 	SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
1962 				 "Citus Metadata Sync: %u/%u",
1963 				 database, extensionOwner);
1964 	worker.bgw_flags =
1965 		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
1966 	worker.bgw_start_time = BgWorkerStart_ConsistentState;
1967 
1968 	/* don't restart, we manage restarts from maintenance daemon */
1969 	worker.bgw_restart_time = BGW_NEVER_RESTART;
1970 	strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
1971 	strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
1972 			 "SyncMetadataToNodesMain");
1973 	worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
1974 	memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
1975 			 sizeof(Oid));
1976 	worker.bgw_notify_pid = MyProcPid;
1977 
1978 	if (!RegisterDynamicBackgroundWorker(&worker, &handle))
1979 	{
1980 		return NULL;
1981 	}
1982 
1983 	pid_t pid;
1984 	WaitForBackgroundWorkerStartup(handle, &pid);
1985 
1986 	return handle;
1987 }
1988 
1989 
1990 /*
1991  * SignalMetadataSyncDaemon signals metadata sync daemons belonging to
1992  * the given database.
1993  */
1994 void
SignalMetadataSyncDaemon(Oid database,int sig)1995 SignalMetadataSyncDaemon(Oid database, int sig)
1996 {
1997 	int backendCount = pgstat_fetch_stat_numbackends();
1998 	for (int backend = 1; backend <= backendCount; backend++)
1999 	{
2000 		LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend);
2001 		if (!localBeEntry)
2002 		{
2003 			continue;
2004 		}
2005 
2006 		PgBackendStatus *beStatus = &localBeEntry->backendStatus;
2007 		if (beStatus->st_databaseid == database &&
2008 			strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0)
2009 		{
2010 			kill(beStatus->st_procpid, sig);
2011 		}
2012 	}
2013 }
2014 
2015 
2016 /*
2017  * ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated.
2018  * It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the
2019  * check.
2020  */
2021 bool
ShouldInitiateMetadataSync(bool * lockFailure)2022 ShouldInitiateMetadataSync(bool *lockFailure)
2023 {
2024 	if (!IsCoordinator())
2025 	{
2026 		*lockFailure = false;
2027 		return false;
2028 	}
2029 
2030 	Oid distNodeOid = DistNodeRelationId();
2031 	if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock))
2032 	{
2033 		*lockFailure = true;
2034 		return false;
2035 	}
2036 
2037 	bool shouldSyncMetadata = false;
2038 
2039 	List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
2040 	WorkerNode *workerNode = NULL;
2041 	foreach_ptr(workerNode, workerList)
2042 	{
2043 		if (workerNode->hasMetadata && !workerNode->metadataSynced)
2044 		{
2045 			shouldSyncMetadata = true;
2046 			break;
2047 		}
2048 	}
2049 
2050 	UnlockRelationOid(distNodeOid, AccessShareLock);
2051 
2052 	*lockFailure = false;
2053 	return shouldSyncMetadata;
2054 }
2055 
2056 
2057 /*
2058  * citus_internal_add_partition_metadata is an internal UDF to
2059  * add a row to pg_dist_partition.
2060  */
2061 Datum
citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)2062 citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
2063 {
2064 	CheckCitusVersion(ERROR);
2065 
2066 	PG_ENSURE_ARGNOTNULL(0, "relation");
2067 	Oid relationId = PG_GETARG_OID(0);
2068 
2069 	PG_ENSURE_ARGNOTNULL(1, "distribution method");
2070 	char distributionMethod = PG_GETARG_CHAR(1);
2071 
2072 	PG_ENSURE_ARGNOTNULL(3, "Colocation ID");
2073 	int colocationId = PG_GETARG_INT32(3);
2074 
2075 	PG_ENSURE_ARGNOTNULL(4, "replication model");
2076 	char replicationModel = PG_GETARG_CHAR(4);
2077 
2078 	text *distributionColumnText = NULL;
2079 	char *distributionColumnString = NULL;
2080 	Var *distributionColumnVar = NULL;
2081 
2082 	/* only owner of the table (or superuser) is allowed to add the Citus metadata */
2083 	EnsureTableOwner(relationId);
2084 
2085 	/* we want to serialize all the metadata changes to this table */
2086 	LockRelationOid(relationId, ShareUpdateExclusiveLock);
2087 
2088 	if (!PG_ARGISNULL(2))
2089 	{
2090 		distributionColumnText = PG_GETARG_TEXT_P(2);
2091 		distributionColumnString = text_to_cstring(distributionColumnText);
2092 
2093 		Relation relation = relation_open(relationId, AccessShareLock);
2094 		distributionColumnVar =
2095 			BuildDistributionKeyFromColumnName(relation, distributionColumnString);
2096 		Assert(distributionColumnVar != NULL);
2097 
2098 		relation_close(relation, NoLock);
2099 	}
2100 
2101 	if (!ShouldSkipMetadataChecks())
2102 	{
2103 		/* this UDF is not allowed allowed for executing as a separate command */
2104 		EnsureCoordinatorInitiatedOperation();
2105 
2106 		if (distributionMethod == DISTRIBUTE_BY_NONE && distributionColumnVar != NULL)
2107 		{
2108 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2109 							errmsg("Reference or local tables cannot have "
2110 								   "distribution columns")));
2111 		}
2112 		else if (distributionMethod != DISTRIBUTE_BY_NONE &&
2113 				 distributionColumnVar == NULL)
2114 		{
2115 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2116 							errmsg("Distribution column cannot be NULL for "
2117 								   "relation \"%s\"", get_rel_name(relationId))));
2118 		}
2119 
2120 		/*
2121 		 * Even if the table owner is a malicious user and the partition
2122 		 * metadata is not sane, the user can only affect its own tables.
2123 		 * Given that the user is owner of the table, we should allow.
2124 		 */
2125 		EnsurePartitionMetadataIsSane(relationId, distributionMethod, colocationId,
2126 									  replicationModel, distributionColumnVar);
2127 	}
2128 
2129 	InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar,
2130 							  colocationId, replicationModel);
2131 
2132 	PG_RETURN_VOID();
2133 }
2134 
2135 
2136 /*
2137  * EnsurePartitionMetadataIsSane ensures that the input values are safe
2138  * for inserting into pg_dist_partition metadata.
2139  */
2140 static void
EnsurePartitionMetadataIsSane(Oid relationId,char distributionMethod,int colocationId,char replicationModel,Var * distributionColumnVar)2141 EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int colocationId,
2142 							  char replicationModel, Var *distributionColumnVar)
2143 {
2144 	if (!(distributionMethod == DISTRIBUTE_BY_HASH ||
2145 		  distributionMethod == DISTRIBUTE_BY_NONE))
2146 	{
2147 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2148 						errmsg("Metadata syncing is only allowed for hash, reference "
2149 							   "and local tables:%c", distributionMethod)));
2150 	}
2151 
2152 	if (colocationId < INVALID_COLOCATION_ID)
2153 	{
2154 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2155 						errmsg("Metadata syncing is only allowed for valid "
2156 							   "colocation id values.")));
2157 	}
2158 	else if (colocationId != INVALID_COLOCATION_ID &&
2159 			 distributionMethod == DISTRIBUTE_BY_HASH)
2160 	{
2161 		int count = 1;
2162 		List *targetColocatedTableList =
2163 			ColocationGroupTableList(colocationId, count);
2164 
2165 		/*
2166 		 * If we have any colocated hash tables, ensure if they share the
2167 		 * same distribution key properties.
2168 		 */
2169 		if (list_length(targetColocatedTableList) >= 1)
2170 		{
2171 			Oid targetRelationId = linitial_oid(targetColocatedTableList);
2172 
2173 			EnsureColumnTypeEquality(relationId, targetRelationId, distributionColumnVar,
2174 									 DistPartitionKeyOrError(targetRelationId));
2175 		}
2176 	}
2177 
2178 
2179 	if (!(replicationModel == REPLICATION_MODEL_2PC ||
2180 		  replicationModel == REPLICATION_MODEL_STREAMING ||
2181 		  replicationModel == REPLICATION_MODEL_COORDINATOR))
2182 	{
2183 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2184 						errmsg("Metadata syncing is only allowed for "
2185 							   "known replication models.")));
2186 	}
2187 
2188 	if (distributionMethod == DISTRIBUTE_BY_HASH &&
2189 		replicationModel != REPLICATION_MODEL_STREAMING)
2190 	{
2191 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2192 						errmsg("Hash distributed tables can only have '%c' "
2193 							   "as the replication model.",
2194 							   REPLICATION_MODEL_STREAMING)));
2195 	}
2196 
2197 	if (distributionMethod == DISTRIBUTE_BY_NONE &&
2198 		!(replicationModel == REPLICATION_MODEL_STREAMING ||
2199 		  replicationModel == REPLICATION_MODEL_2PC))
2200 	{
2201 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2202 						errmsg("Local or references tables can only have '%c' or '%c' "
2203 							   "as the replication model.",
2204 							   REPLICATION_MODEL_STREAMING, REPLICATION_MODEL_2PC)));
2205 	}
2206 }
2207 
2208 
2209 /*
2210  * citus_internal_add_shard_metadata is an internal UDF to
2211  * add a row to pg_dist_shard.
2212  */
2213 Datum
citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)2214 citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
2215 {
2216 	CheckCitusVersion(ERROR);
2217 
2218 	PG_ENSURE_ARGNOTNULL(0, "relation");
2219 	Oid relationId = PG_GETARG_OID(0);
2220 
2221 	PG_ENSURE_ARGNOTNULL(1, "shard id");
2222 	int64 shardId = PG_GETARG_INT64(1);
2223 
2224 	PG_ENSURE_ARGNOTNULL(2, "storage type");
2225 	char storageType = PG_GETARG_CHAR(2);
2226 
2227 	text *shardMinValue = NULL;
2228 	if (!PG_ARGISNULL(3))
2229 	{
2230 		shardMinValue = PG_GETARG_TEXT_P(3);
2231 	}
2232 
2233 	text *shardMaxValue = NULL;
2234 	if (!PG_ARGISNULL(4))
2235 	{
2236 		shardMaxValue = PG_GETARG_TEXT_P(4);
2237 	}
2238 
2239 	/* only owner of the table (or superuser) is allowed to add the Citus metadata */
2240 	EnsureTableOwner(relationId);
2241 
2242 	/* we want to serialize all the metadata changes to this table */
2243 	LockRelationOid(relationId, ShareUpdateExclusiveLock);
2244 
2245 	if (!ShouldSkipMetadataChecks())
2246 	{
2247 		/* this UDF is not allowed allowed for executing as a separate command */
2248 		EnsureCoordinatorInitiatedOperation();
2249 
2250 		/*
2251 		 * Even if the table owner is a malicious user and the shard metadata is
2252 		 * not sane, the user can only affect its own tables. Given that the
2253 		 * user is owner of the table, we should allow.
2254 		 */
2255 		EnsureShardMetadataIsSane(relationId, shardId, storageType, shardMinValue,
2256 								  shardMaxValue);
2257 	}
2258 
2259 	InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
2260 
2261 	PG_RETURN_VOID();
2262 }
2263 
2264 
2265 /*
2266  * EnsureCoordinatorInitiatedOperation is a helper function which ensures that
2267  * the execution is initiated by the coordinator on a worker node.
2268  */
2269 static void
EnsureCoordinatorInitiatedOperation(void)2270 EnsureCoordinatorInitiatedOperation(void)
2271 {
2272 	/*
2273 	 * We are restricting the operation to only MX workers with the local group id
2274 	 * check. The other two checks are to ensure that the operation is initiated
2275 	 * by the coordinator.
2276 	 */
2277 	if (!IsCitusInitiatedRemoteBackend() || !MyBackendIsInDisributedTransaction() ||
2278 		GetLocalGroupId() == COORDINATOR_GROUP_ID)
2279 	{
2280 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2281 						errmsg("This is an internal Citus function can only be "
2282 							   "used in a distributed transaction")));
2283 	}
2284 }
2285 
2286 
2287 /*
2288  * EnsureShardMetadataIsSane ensures that the input values are safe
2289  * for inserting into pg_dist_shard metadata.
2290  */
2291 static void
EnsureShardMetadataIsSane(Oid relationId,int64 shardId,char storageType,text * shardMinValue,text * shardMaxValue)2292 EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
2293 						  text *shardMinValue, text *shardMaxValue)
2294 {
2295 	if (shardId <= INVALID_SHARD_ID)
2296 	{
2297 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2298 						errmsg("Invalid shard id: %ld", shardId)));
2299 	}
2300 
2301 	if (!(storageType == SHARD_STORAGE_TABLE ||
2302 		  storageType == SHARD_STORAGE_FOREIGN ||
2303 		  storageType == SHARD_STORAGE_COLUMNAR))
2304 	{
2305 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2306 						errmsg("Invalid shard storage type: %c", storageType)));
2307 	}
2308 
2309 	char partitionMethod = PartitionMethodViaCatalog(relationId);
2310 	if (partitionMethod == DISTRIBUTE_BY_INVALID)
2311 	{
2312 		/* connection from the coordinator operating on a shard */
2313 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2314 						errmsg("The relation \"%s\" does not have a valid "
2315 							   "entry in pg_dist_partition.",
2316 							   get_rel_name(relationId))));
2317 	}
2318 	else if (!(partitionMethod == DISTRIBUTE_BY_HASH ||
2319 			   partitionMethod == DISTRIBUTE_BY_NONE))
2320 	{
2321 		/* connection from the coordinator operating on a shard */
2322 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2323 						errmsg("Metadata syncing is only allowed for hash, "
2324 							   "reference and local tables: %c", partitionMethod)));
2325 	}
2326 
2327 	List *distShardTupleList = LookupDistShardTuples(relationId);
2328 	if (partitionMethod == DISTRIBUTE_BY_NONE)
2329 	{
2330 		if (shardMinValue != NULL || shardMaxValue != NULL)
2331 		{
2332 			char *relationName = get_rel_name(relationId);
2333 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2334 							errmsg("Shards of reference or local table \"%s\" should "
2335 								   "have NULL shard ranges", relationName)));
2336 		}
2337 		else if (list_length(distShardTupleList) != 0)
2338 		{
2339 			char *relationName = get_rel_name(relationId);
2340 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2341 							errmsg("relation \"%s\" has already at least one shard, "
2342 								   "adding more is not allowed", relationName)));
2343 		}
2344 	}
2345 	else if (partitionMethod == DISTRIBUTE_BY_HASH)
2346 	{
2347 		if (shardMinValue == NULL || shardMaxValue == NULL)
2348 		{
2349 			char *relationName = get_rel_name(relationId);
2350 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2351 							errmsg("Shards of has distributed table  \"%s\" "
2352 								   "cannot have NULL shard ranges", relationName)));
2353 		}
2354 
2355 		char *shardMinValueString = text_to_cstring(shardMinValue);
2356 		char *shardMaxValueString = text_to_cstring(shardMaxValue);
2357 
2358 		/* pg_strtoint32 does the syntax and out of bound checks for us */
2359 		int32 shardMinValueInt = pg_strtoint32(shardMinValueString);
2360 		int32 shardMaxValueInt = pg_strtoint32(shardMaxValueString);
2361 
2362 		if (shardMinValueInt > shardMaxValueInt)
2363 		{
2364 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2365 							errmsg("shardMinValue=%d is greater than "
2366 								   "shardMaxValue=%d for table \"%s\", which is "
2367 								   "not allowed", shardMinValueInt,
2368 								   shardMaxValueInt, get_rel_name(relationId))));
2369 		}
2370 
2371 		/*
2372 		 * We are only dealing with hash distributed tables, that's why we
2373 		 * can hard code data type and typemod.
2374 		 */
2375 		const int intervalTypeId = INT4OID;
2376 		const int intervalTypeMod = -1;
2377 
2378 		Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
2379 		TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
2380 
2381 		FmgrInfo *shardIntervalCompareFunction =
2382 			GetFunctionInfo(intervalTypeId, BTREE_AM_OID, BTORDER_PROC);
2383 
2384 		HeapTuple shardTuple = NULL;
2385 		foreach_ptr(shardTuple, distShardTupleList)
2386 		{
2387 			ShardInterval *shardInterval =
2388 				TupleToShardInterval(shardTuple, distShardTupleDesc,
2389 									 intervalTypeId, intervalTypeMod);
2390 
2391 			Datum firstMin = Int32GetDatum(shardMinValueInt);
2392 			Datum firstMax = Int32GetDatum(shardMaxValueInt);
2393 			Datum secondMin = shardInterval->minValue;
2394 			Datum secondMax = shardInterval->maxValue;
2395 			Oid collationId = InvalidOid;
2396 
2397 			/*
2398 			 * This is an unexpected case as we are reading the metadata, which has
2399 			 * already been verified for being not NULL. Still, lets be extra
2400 			 * cautious to avoid any crashes.
2401 			 */
2402 			if (!shardInterval->minValueExists || !shardInterval->maxValueExists)
2403 			{
2404 				char *relationName = get_rel_name(relationId);
2405 				ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2406 								errmsg("Shards of has distributed table  \"%s\" "
2407 									   "cannot have NULL shard ranges", relationName)));
2408 			}
2409 
2410 			if (ShardIntervalsOverlapWithParams(firstMin, firstMax, secondMin, secondMax,
2411 												shardIntervalCompareFunction,
2412 												collationId))
2413 			{
2414 				ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2415 								errmsg("Shard intervals overlap for table \"%s\": "
2416 									   "%ld and %ld", get_rel_name(relationId),
2417 									   shardId, shardInterval->shardId)));
2418 			}
2419 		}
2420 
2421 		table_close(distShardRelation, NoLock);
2422 	}
2423 }
2424 
2425 
2426 /*
2427  * citus_internal_add_placement_metadata is an internal UDF to
2428  * add a row to pg_dist_placement.
2429  */
2430 Datum
citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)2431 citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
2432 {
2433 	CheckCitusVersion(ERROR);
2434 
2435 	int64 shardId = PG_GETARG_INT64(0);
2436 	int32 shardState = PG_GETARG_INT32(1);
2437 	int64 shardLength = PG_GETARG_INT64(2);
2438 	int32 groupId = PG_GETARG_INT32(3);
2439 	int64 placementId = PG_GETARG_INT64(4);
2440 
2441 	bool missingOk = false;
2442 	Oid relationId = LookupShardRelationFromCatalog(shardId, missingOk);
2443 
2444 	/* only owner of the table is allowed to modify the metadata */
2445 	EnsureTableOwner(relationId);
2446 
2447 	/* we want to serialize all the metadata changes to this table */
2448 	LockRelationOid(relationId, ShareUpdateExclusiveLock);
2449 
2450 	if (!ShouldSkipMetadataChecks())
2451 	{
2452 		/* this UDF is not allowed allowed for executing as a separate command */
2453 		EnsureCoordinatorInitiatedOperation();
2454 
2455 		/*
2456 		 * Even if the table owner is a malicious user, as long as the shard placements
2457 		 * fit into basic requirements of Citus metadata, the user can only affect its
2458 		 * own tables. Given that the user is owner of the table, we should allow.
2459 		 */
2460 		EnsureShardPlacementMetadataIsSane(relationId, shardId, placementId, shardState,
2461 										   shardLength, groupId);
2462 	}
2463 
2464 	InsertShardPlacementRow(shardId, placementId, shardState, shardLength, groupId);
2465 
2466 	PG_RETURN_VOID();
2467 }
2468 
2469 
2470 /*
2471  * EnsureShardPlacementMetadataIsSane ensures if the input parameters for
2472  * the shard placement metadata is sane.
2473  */
2474 static void
EnsureShardPlacementMetadataIsSane(Oid relationId,int64 shardId,int64 placementId,int32 shardState,int64 shardLength,int32 groupId)2475 EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placementId,
2476 								   int32 shardState, int64 shardLength, int32 groupId)
2477 {
2478 	/* we have just read the metadata, so we are sure that the shard exists */
2479 	Assert(ShardExists(shardId));
2480 
2481 	if (placementId <= INVALID_PLACEMENT_ID)
2482 	{
2483 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2484 						errmsg("Shard placement has invalid placement id "
2485 							   "(%ld) for shard(%ld)", placementId, shardId)));
2486 	}
2487 
2488 	if (shardState != SHARD_STATE_ACTIVE)
2489 	{
2490 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2491 						errmsg("Invalid shard state: %d", shardState)));
2492 	}
2493 
2494 	bool nodeIsInMetadata = false;
2495 	WorkerNode *workerNode =
2496 		PrimaryNodeForGroup(groupId, &nodeIsInMetadata);
2497 	if (!workerNode)
2498 	{
2499 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2500 						errmsg("Node with group id %d for shard placement "
2501 							   "%ld does not exist", groupId, shardId)));
2502 	}
2503 }
2504 
2505 
2506 /*
2507  * ShouldSkipMetadataChecks returns true if the current user is allowed to
2508  * make any
2509  */
2510 static bool
ShouldSkipMetadataChecks(void)2511 ShouldSkipMetadataChecks(void)
2512 {
2513 	if (strcmp(EnableManualMetadataChangesForUser, "") != 0)
2514 	{
2515 		/*
2516 		 * EnableManualMetadataChangesForUser is a GUC which
2517 		 * can be changed by a super user. We use this GUC as
2518 		 * a safety belt in case the current metadata checks are
2519 		 * too restrictive and the operator can allow users to skip
2520 		 * the checks.
2521 		 */
2522 
2523 		/*
2524 		 * Make sure that the user exists, and print it to prevent any
2525 		 * optimization skipping the get_role_oid call.
2526 		 */
2527 		bool missingOK = false;
2528 		Oid allowedUserId = get_role_oid(EnableManualMetadataChangesForUser, missingOK);
2529 		if (allowedUserId == GetUserId())
2530 		{
2531 			return true;
2532 		}
2533 	}
2534 
2535 	return false;
2536 }
2537 
2538 
2539 /*
2540  * citus_internal_update_placement_metadata is an internal UDF to
2541  * update a row in pg_dist_placement.
2542  */
2543 Datum
citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)2544 citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)
2545 {
2546 	CheckCitusVersion(ERROR);
2547 
2548 	int64 shardId = PG_GETARG_INT64(0);
2549 	int32 sourceGroupId = PG_GETARG_INT32(1);
2550 	int32 targetGroupId = PG_GETARG_INT32(2);
2551 
2552 	ShardPlacement *placement = NULL;
2553 	if (!ShouldSkipMetadataChecks())
2554 	{
2555 		/* this UDF is not allowed allowed for executing as a separate command */
2556 		EnsureCoordinatorInitiatedOperation();
2557 
2558 		if (!ShardExists(shardId))
2559 		{
2560 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2561 							errmsg("Shard id does not exists: %ld", shardId)));
2562 		}
2563 
2564 		bool missingOk = false;
2565 		EnsureShardOwner(shardId, missingOk);
2566 
2567 		/*
2568 		 * This function ensures that the source group exists hence we
2569 		 * call it from this code-block.
2570 		 */
2571 		placement = ActiveShardPlacementOnGroup(sourceGroupId, shardId);
2572 
2573 		bool nodeIsInMetadata = false;
2574 		WorkerNode *workerNode =
2575 			PrimaryNodeForGroup(targetGroupId, &nodeIsInMetadata);
2576 		if (!workerNode)
2577 		{
2578 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2579 							errmsg("Node with group id %d for shard placement "
2580 								   "%ld does not exist", targetGroupId, shardId)));
2581 		}
2582 	}
2583 	else
2584 	{
2585 		placement = ActiveShardPlacementOnGroup(sourceGroupId, shardId);
2586 	}
2587 
2588 	/*
2589 	 * Updating pg_dist_placement ensures that the node with targetGroupId
2590 	 * exists and this is the only placement on that group.
2591 	 */
2592 	if (placement == NULL)
2593 	{
2594 		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2595 						errmsg("Active placement for shard %ld is not "
2596 							   "found on group:%d", shardId, targetGroupId)));
2597 	}
2598 
2599 	UpdatePlacementGroupId(placement->placementId, targetGroupId);
2600 
2601 	PG_RETURN_VOID();
2602 }
2603 
2604 
2605 /*
2606  * citus_internal_delete_shard_metadata is an internal UDF to
2607  * delete a row in pg_dist_shard and corresponding placement rows
2608  * from pg_dist_shard_placement.
2609  */
2610 Datum
citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS)2611 citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS)
2612 {
2613 	CheckCitusVersion(ERROR);
2614 
2615 	int64 shardId = PG_GETARG_INT64(0);
2616 
2617 	if (!ShouldSkipMetadataChecks())
2618 	{
2619 		/* this UDF is not allowed allowed for executing as a separate command */
2620 		EnsureCoordinatorInitiatedOperation();
2621 
2622 		if (!ShardExists(shardId))
2623 		{
2624 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2625 							errmsg("Shard id does not exists: %ld", shardId)));
2626 		}
2627 
2628 		bool missingOk = false;
2629 		EnsureShardOwner(shardId, missingOk);
2630 	}
2631 
2632 	List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
2633 	ShardPlacement *shardPlacement = NULL;
2634 	foreach_ptr(shardPlacement, shardPlacementList)
2635 	{
2636 		DeleteShardPlacementRow(shardPlacement->placementId);
2637 	}
2638 
2639 	DeleteShardRow(shardId);
2640 
2641 	PG_RETURN_VOID();
2642 }
2643 
2644 
2645 /*
2646  * citus_internal_update_relation_colocation is an internal UDF to
2647  * delete a row in pg_dist_shard and corresponding placement rows
2648  * from pg_dist_shard_placement.
2649  */
2650 Datum
citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)2651 citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
2652 {
2653 	CheckCitusVersion(ERROR);
2654 
2655 	Oid relationId = PG_GETARG_OID(0);
2656 	uint32 tagetColocationId = PG_GETARG_UINT32(1);
2657 
2658 	EnsureTableOwner(relationId);
2659 
2660 	if (!ShouldSkipMetadataChecks())
2661 	{
2662 		/* this UDF is not allowed allowed for executing as a separate command */
2663 		EnsureCoordinatorInitiatedOperation();
2664 
2665 		/* ensure that the table is in pg_dist_partition */
2666 		char partitionMethod = PartitionMethodViaCatalog(relationId);
2667 		if (partitionMethod == DISTRIBUTE_BY_INVALID)
2668 		{
2669 			/* connection from the coordinator operating on a shard */
2670 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2671 							errmsg("The relation \"%s\" does not have a valid "
2672 								   "entry in pg_dist_partition.",
2673 								   get_rel_name(relationId))));
2674 		}
2675 		else if (partitionMethod != DISTRIBUTE_BY_HASH)
2676 		{
2677 			/* connection from the coordinator operating on a shard */
2678 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2679 							errmsg("Updating colocation ids are only allowed for hash "
2680 								   "distributed tables: %c", partitionMethod)));
2681 		}
2682 
2683 		int count = 1;
2684 		List *targetColocatedTableList =
2685 			ColocationGroupTableList(tagetColocationId, count);
2686 
2687 		if (list_length(targetColocatedTableList) == 0)
2688 		{
2689 			/* the table is colocated with none, so nothing to check */
2690 		}
2691 		else
2692 		{
2693 			Oid targetRelationId = linitial_oid(targetColocatedTableList);
2694 
2695 			ErrorIfShardPlacementsNotColocated(relationId, targetRelationId);
2696 			CheckReplicationModel(relationId, targetRelationId);
2697 			CheckDistributionColumnType(relationId, targetRelationId);
2698 		}
2699 	}
2700 
2701 	bool localOnly = true;
2702 	UpdateRelationColocationGroup(relationId, tagetColocationId, localOnly);
2703 
2704 	PG_RETURN_VOID();
2705 }
2706