1 /*-------------------------------------------------------------------------
2 *
3 * metadata_sync.c
4 *
5 * Routines for synchronizing metadata to all workers.
6 *
7 * Copyright (c) Citus Data, Inc.
8 *
9 * $Id$
10 *
11 *-------------------------------------------------------------------------
12 */
13
14 #include "postgres.h"
15 #include "miscadmin.h"
16
17 #include <signal.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20
21 #include "access/genam.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/nbtree.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "catalog/dependency.h"
28 #include "catalog/indexing.h"
29 #include "catalog/pg_am.h"
30 #include "catalog/pg_attrdef.h"
31 #include "catalog/pg_depend.h"
32 #include "catalog/pg_foreign_server.h"
33 #include "catalog/pg_namespace.h"
34 #include "catalog/pg_type.h"
35 #include "commands/async.h"
36 #include "distributed/argutils.h"
37 #include "distributed/backend_data.h"
38 #include "distributed/citus_ruleutils.h"
39 #include "distributed/colocation_utils.h"
40 #include "distributed/commands.h"
41 #include "distributed/deparser.h"
42 #include "distributed/distribution_column.h"
43 #include "distributed/listutils.h"
44 #include "distributed/metadata_utility.h"
45 #include "distributed/coordinator_protocol.h"
46 #include "distributed/maintenanced.h"
47 #include "distributed/metadata_cache.h"
48 #include "distributed/metadata_sync.h"
49 #include "distributed/metadata/distobject.h"
50 #include "distributed/multi_executor.h"
51 #include "distributed/multi_join_order.h"
52 #include "distributed/multi_partitioning_utils.h"
53 #include "distributed/multi_physical_planner.h"
54 #include "distributed/pg_dist_node.h"
55 #include "distributed/pg_dist_shard.h"
56 #include "distributed/relation_access_tracking.h"
57 #include "distributed/remote_commands.h"
58 #include "distributed/resource_lock.h"
59 #include "distributed/worker_manager.h"
60 #include "distributed/worker_protocol.h"
61 #include "distributed/worker_transaction.h"
62 #include "distributed/version_compat.h"
63 #include "executor/spi.h"
64 #include "foreign/foreign.h"
65 #include "miscadmin.h"
66 #include "nodes/pg_list.h"
67 #include "pgstat.h"
68 #include "postmaster/bgworker.h"
69 #include "postmaster/postmaster.h"
70 #include "storage/lmgr.h"
71 #include "utils/builtins.h"
72 #include "utils/fmgroids.h"
73 #include "utils/lsyscache.h"
74 #include "utils/memutils.h"
75 #include "utils/snapmgr.h"
76 #include "utils/syscache.h"
77
78
79 /* managed via a GUC */
80 char *EnableManualMetadataChangesForUser = "";
81
82
83 static void EnsureSequentialModeMetadataOperations(void);
84 static List * GetDistributedTableDDLEvents(Oid relationId);
85 static char * LocalGroupIdUpdateCommand(int32 groupId);
86 static List * SequenceDependencyCommandList(Oid relationId);
87 static char * TruncateTriggerCreateCommand(Oid relationId);
88 static char * SchemaOwnerName(Oid objectId);
89 static bool HasMetadataWorkers(void);
90 static List * DetachPartitionCommandList(void);
91 static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
92 static void DropMetadataSnapshotOnNode(WorkerNode *workerNode);
93 static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId,
94 char *columnName);
95 static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid,
96 AclItem *aclItem);
97 static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid,
98 Oid schemaOid,
99 char *permission,
100 bool withGrantOption);
101 static char * GenerateSetRoleQuery(Oid roleOid);
102 static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
103 static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
104
105
106 static bool ShouldSkipMetadataChecks(void);
107 static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod,
108 int colocationId, char replicationModel,
109 Var *distributionKey);
110 static void EnsureCoordinatorInitiatedOperation(void);
111 static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
112 text *shardMinValue,
113 text *shardMaxValue);
114 static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId,
115 int64 placementId, int32 shardState,
116 int64 shardLength, int32 groupId);
117
118 PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
119 PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
120 PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
121
122
123 /*
124 * Functions to modify metadata. Normally modifying metadata requires
125 * superuser. However, these functions can be called with superusers
126 * or regular users as long as the regular user owns the input object.
127 */
128 PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
129 PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
130 PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
131 PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
132 PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
133 PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
134
135
136 static bool got_SIGTERM = false;
137 static bool got_SIGALRM = false;
138
139 #define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon"
140
141
142 /*
143 * start_metadata_sync_to_node function sets hasmetadata column of the given
144 * node to true, and then synchronizes the metadata on the node.
145 */
146 Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS)147 start_metadata_sync_to_node(PG_FUNCTION_ARGS)
148 {
149 CheckCitusVersion(ERROR);
150
151 text *nodeName = PG_GETARG_TEXT_P(0);
152 int32 nodePort = PG_GETARG_INT32(1);
153
154 char *nodeNameString = text_to_cstring(nodeName);
155
156 StartMetadataSyncToNode(nodeNameString, nodePort);
157
158 PG_RETURN_VOID();
159 }
160
161
162 /*
163 * StartMetadataSyncToNode is the internal API for
164 * start_metadata_sync_to_node().
165 */
166 void
StartMetadataSyncToNode(const char * nodeNameString,int32 nodePort)167 StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
168 {
169 char *escapedNodeName = quote_literal_cstr(nodeNameString);
170
171 CheckCitusVersion(ERROR);
172 EnsureCoordinator();
173 EnsureSuperUser();
174 EnsureModificationsCanRun();
175
176 EnsureSequentialModeMetadataOperations();
177
178 LockRelationOid(DistNodeRelationId(), ExclusiveLock);
179
180 WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort);
181 if (workerNode == NULL)
182 {
183 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
184 errmsg("you cannot sync metadata to a non-existent node"),
185 errhint("First, add the node with SELECT citus_add_node"
186 "(%s,%d)", escapedNodeName, nodePort)));
187 }
188
189 if (!workerNode->isActive)
190 {
191 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 errmsg("you cannot sync metadata to an inactive node"),
193 errhint("First, activate the node with "
194 "SELECT citus_activate_node(%s,%d)",
195 escapedNodeName, nodePort)));
196 }
197
198 if (NodeIsCoordinator(workerNode))
199 {
200 ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
201 "metadata, skipping syncing the metadata",
202 nodeNameString, nodePort)));
203 return;
204 }
205
206 UseCoordinatedTransaction();
207
208 /*
209 * One would normally expect to set hasmetadata first, and then metadata sync.
210 * However, at this point we do the order reverse.
211 * We first set metadatasynced, and then hasmetadata; since setting columns for
212 * nodes with metadatasynced==false could cause errors.
213 * (See ErrorIfAnyMetadataNodeOutOfSync)
214 * We can safely do that because we are in a coordinated transaction and the changes
215 * are only visible to our own transaction.
216 * If anything goes wrong, we are going to rollback all the changes.
217 */
218 workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
219 BoolGetDatum(true));
220 workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(
221 true));
222
223 if (!NodeIsPrimary(workerNode))
224 {
225 /*
226 * If this is a secondary node we can't actually sync metadata to it; we assume
227 * the primary node is receiving metadata.
228 */
229 return;
230 }
231
232 /* fail if metadata synchronization doesn't succeed */
233 bool raiseInterrupts = true;
234 SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
235 }
236
237
238 /*
239 * EnsureSequentialModeMetadataOperations makes sure that the current transaction is
240 * already in sequential mode, or can still safely be put in sequential mode,
241 * it errors if that is not possible. The error contains information for the user to
242 * retry the transaction with sequential mode set from the begining.
243 *
244 * Metadata objects (e.g., distributed table on the workers) exists only 1 instance of
245 * the type used by potentially multiple other shards/connections. To make sure all
246 * shards/connections in the transaction can interact with the metadata needs to be
247 * visible on all connections used by the transaction, meaning we can only use 1
248 * connection per node.
249 */
250 static void
EnsureSequentialModeMetadataOperations(void)251 EnsureSequentialModeMetadataOperations(void)
252 {
253 if (!IsTransactionBlock())
254 {
255 /* we do not need to switch to sequential mode if we are not in a transaction */
256 return;
257 }
258
259 if (ParallelQueryExecutedInTransaction())
260 {
261 ereport(ERROR, (errmsg(
262 "cannot execute metadata syncing operation because there was a "
263 "parallel operation on a distributed table in the "
264 "transaction"),
265 errdetail("When modifying metadata, Citus needs to "
266 "perform all operations over a single connection per "
267 "node to ensure consistency."),
268 errhint("Try re-running the transaction with "
269 "\"SET LOCAL citus.multi_shard_modify_mode TO "
270 "\'sequential\';\"")));
271 }
272
273 ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
274 errdetail("Metadata synced or stopped syncing. To make "
275 "sure subsequent commands see the metadata correctly "
276 "we need to make sure to use only one connection for "
277 "all future commands")));
278 SetLocalMultiShardModifyModeToSequential();
279 }
280
281
282 /*
283 * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node
284 * to false in pg_dist_node table, thus indicating that the specified worker node does not
285 * receive DDL changes anymore and cannot be used for issuing queries.
286 */
287 Datum
stop_metadata_sync_to_node(PG_FUNCTION_ARGS)288 stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
289 {
290 CheckCitusVersion(ERROR);
291 EnsureCoordinator();
292 EnsureSuperUser();
293
294 text *nodeName = PG_GETARG_TEXT_P(0);
295 int32 nodePort = PG_GETARG_INT32(1);
296 bool clearMetadata = PG_GETARG_BOOL(2);
297 char *nodeNameString = text_to_cstring(nodeName);
298
299 LockRelationOid(DistNodeRelationId(), ExclusiveLock);
300
301 WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort);
302 if (workerNode == NULL)
303 {
304 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
305 errmsg("node (%s,%d) does not exist", nodeNameString, nodePort)));
306 }
307
308 if (NodeIsCoordinator(workerNode))
309 {
310 ereport(NOTICE, (errmsg("node (%s,%d) is the coordinator and should have "
311 "metadata, skipping stopping the metadata sync",
312 nodeNameString, nodePort)));
313 PG_RETURN_VOID();
314 }
315
316 if (clearMetadata)
317 {
318 if (NodeIsPrimary(workerNode))
319 {
320 ereport(NOTICE, (errmsg("dropping metadata on the node (%s,%d)",
321 nodeNameString, nodePort)));
322 DropMetadataSnapshotOnNode(workerNode);
323 }
324 else
325 {
326 /*
327 * If this is a secondary node we can't actually clear metadata from it,
328 * we assume the primary node is cleared.
329 */
330 ereport(NOTICE, (errmsg("(%s,%d) is a secondary node: to clear the metadata,"
331 " you should clear metadata from the primary node",
332 nodeNameString, nodePort)));
333 }
334 }
335
336 workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(
337 false));
338 workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
339 BoolGetDatum(false));
340
341 PG_RETURN_VOID();
342 }
343
344
345 /*
346 * ClusterHasKnownMetadataWorkers returns true if the node executing the function
347 * knows at least one worker with metadata. We do it
348 * (a) by checking the node that executes the function is a worker with metadata
349 * (b) the coordinator knows at least one worker with metadata.
350 */
351 bool
ClusterHasKnownMetadataWorkers()352 ClusterHasKnownMetadataWorkers()
353 {
354 bool workerWithMetadata = false;
355
356 if (!IsCoordinator())
357 {
358 workerWithMetadata = true;
359 }
360
361 if (workerWithMetadata || HasMetadataWorkers())
362 {
363 return true;
364 }
365
366 return false;
367 }
368
369
370 /*
371 * ShouldSyncTableMetadata checks if the metadata of a distributed table should be
372 * propagated to metadata workers, i.e. the table is an MX table or reference table.
373 * Tables with streaming replication model (which means RF=1) and hash distribution are
374 * considered as MX tables while tables with none distribution are reference tables.
375 */
376 bool
ShouldSyncTableMetadata(Oid relationId)377 ShouldSyncTableMetadata(Oid relationId)
378 {
379 if (!OidIsValid(relationId) || !IsCitusTable(relationId))
380 {
381 return false;
382 }
383
384 CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
385
386 bool streamingReplicated =
387 (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
388
389 bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry,
390 HASH_DISTRIBUTED));
391 if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
392 {
393 return true;
394 }
395 else
396 {
397 return false;
398 }
399 }
400
401
402 /*
403 * SyncMetadataSnapshotToNode does the following:
404 * 1. Sets the localGroupId on the worker so the worker knows which tuple in
405 * pg_dist_node represents itself.
406 * 2. Recreates the distributed metadata on the given worker.
407 * If raiseOnError is true, it errors out if synchronization fails.
408 */
409 static bool
SyncMetadataSnapshotToNode(WorkerNode * workerNode,bool raiseOnError)410 SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
411 {
412 char *currentUser = CurrentUserName();
413
414 /* generate and add the local group id's update query */
415 char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
416
417 /* generate the queries which drop the metadata */
418 List *dropMetadataCommandList = MetadataDropCommands();
419
420 /* generate the queries which create the metadata from scratch */
421 List *createMetadataCommandList = MetadataCreateCommands();
422
423 List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
424 recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
425 dropMetadataCommandList);
426 recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
427 createMetadataCommandList);
428
429 /*
430 * Send the snapshot recreation commands in a single remote transaction and
431 * if requested, error out in any kind of failure. Note that it is not
432 * required to send createMetadataSnapshotCommandList in the same transaction
433 * that we send nodeDeleteCommand and nodeInsertCommand commands below.
434 */
435 if (raiseOnError)
436 {
437 SendCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
438 workerNode->workerPort,
439 currentUser,
440 recreateMetadataSnapshotCommandList);
441 return true;
442 }
443 else
444 {
445 bool success =
446 SendOptionalCommandListToWorkerInCoordinatedTransaction(
447 workerNode->workerName, workerNode->workerPort,
448 currentUser, recreateMetadataSnapshotCommandList);
449
450 return success;
451 }
452 }
453
454
455 /*
456 * DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them
457 * to the worker given as parameter.
458 */
459 static void
DropMetadataSnapshotOnNode(WorkerNode * workerNode)460 DropMetadataSnapshotOnNode(WorkerNode *workerNode)
461 {
462 EnsureSequentialModeMetadataOperations();
463
464 char *userName = CurrentUserName();
465
466 /* generate the queries which drop the metadata */
467 List *dropMetadataCommandList = MetadataDropCommands();
468
469 dropMetadataCommandList = lappend(dropMetadataCommandList,
470 LocalGroupIdUpdateCommand(0));
471
472 SendOptionalCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
473 workerNode->workerPort,
474 userName,
475 dropMetadataCommandList);
476 }
477
478
479 /*
480 * MetadataCreateCommands returns list of queries that are
481 * required to create the current metadata snapshot of the node that the
482 * function is called. The metadata snapshot commands includes the
483 * following queries:
484 *
485 * (i) Query that populates pg_dist_node table
486 * (ii) Queries that create the clustered tables (including foreign keys,
487 * partitioning hierarchy etc.)
488 * (iii) Queries that populate pg_dist_partition table referenced by (ii)
489 * (iv) Queries that populate pg_dist_shard table referenced by (iii)
490 * (v) Queries that populate pg_dist_placement table referenced by (iv)
491 */
492 List *
MetadataCreateCommands(void)493 MetadataCreateCommands(void)
494 {
495 List *metadataSnapshotCommandList = NIL;
496 List *distributedTableList = CitusTableList();
497 List *propagatedTableList = NIL;
498 bool includeNodesFromOtherClusters = true;
499 List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
500 IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
501
502 /* make sure we have deterministic output for our tests */
503 workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
504
505 /* generate insert command for pg_dist_node table */
506 char *nodeListInsertCommand = NodeListInsertCommand(workerNodeList);
507 metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
508 nodeListInsertCommand);
509
510 /* create the list of tables whose metadata will be created */
511 CitusTableCacheEntry *cacheEntry = NULL;
512 foreach_ptr(cacheEntry, distributedTableList)
513 {
514 if (ShouldSyncTableMetadata(cacheEntry->relationId))
515 {
516 propagatedTableList = lappend(propagatedTableList, cacheEntry);
517 }
518 }
519
520 /* create the tables, but not the metadata */
521 foreach_ptr(cacheEntry, propagatedTableList)
522 {
523 Oid relationId = cacheEntry->relationId;
524 ObjectAddress tableAddress = { 0 };
525
526 if (IsTableOwnedByExtension(relationId))
527 {
528 /* skip table creation when the Citus table is owned by an extension */
529 continue;
530 }
531
532 List *ddlCommandList = GetFullTableCreationCommands(relationId,
533 includeSequenceDefaults);
534 char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
535
536 /*
537 * Tables might have dependencies on different objects, since we create shards for
538 * table via multiple sessions these objects will be created via their own connection
539 * and committed immediately so they become visible to all sessions creating shards.
540 */
541 ObjectAddressSet(tableAddress, RelationRelationId, relationId);
542 EnsureDependenciesExistOnAllNodes(&tableAddress);
543
544 /*
545 * Ensure sequence dependencies and mark them as distributed
546 */
547 List *attnumList = NIL;
548 List *dependentSequenceList = NIL;
549 GetDependentSequencesWithRelation(relationId, &attnumList,
550 &dependentSequenceList, 0);
551 MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
552
553 List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
554 metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
555 workerSequenceDDLCommands);
556
557 /* ddlCommandList contains TableDDLCommand information, need to materialize */
558 TableDDLCommand *tableDDLCommand = NULL;
559 foreach_ptr(tableDDLCommand, ddlCommandList)
560 {
561 Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
562 metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
563 GetTableDDLCommand(tableDDLCommand));
564 }
565
566 metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
567 tableOwnerResetCommand);
568
569 List *sequenceDependencyCommandList = SequenceDependencyCommandList(
570 relationId);
571 metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
572 sequenceDependencyCommandList);
573 }
574
575 /* construct the foreign key constraints after all tables are created */
576 foreach_ptr(cacheEntry, propagatedTableList)
577 {
578 Oid relationId = cacheEntry->relationId;
579
580 if (IsTableOwnedByExtension(relationId))
581 {
582 /* skip foreign key creation when the Citus table is owned by an extension */
583 continue;
584 }
585
586 List *foreignConstraintCommands =
587 GetReferencingForeignConstaintCommands(relationId);
588
589 metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
590 foreignConstraintCommands);
591 }
592
593 /* construct partitioning hierarchy after all tables are created */
594 foreach_ptr(cacheEntry, propagatedTableList)
595 {
596 Oid relationId = cacheEntry->relationId;
597
598 if (IsTableOwnedByExtension(relationId))
599 {
600 /* skip partition creation when the Citus table is owned by an extension */
601 continue;
602 }
603
604 if (PartitionTable(relationId))
605 {
606 char *alterTableAttachPartitionCommands =
607 GenerateAlterTableAttachPartitionCommand(relationId);
608
609 metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
610 alterTableAttachPartitionCommands);
611 }
612 }
613
614 /* after all tables are created, create the metadata */
615 foreach_ptr(cacheEntry, propagatedTableList)
616 {
617 Oid clusteredTableId = cacheEntry->relationId;
618
619 /* add the table metadata command first*/
620 char *metadataCommand = DistributionCreateCommand(cacheEntry);
621 metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
622 metadataCommand);
623
624 /* add the truncate trigger command after the table became distributed */
625 char *truncateTriggerCreateCommand =
626 TruncateTriggerCreateCommand(cacheEntry->relationId);
627 metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
628 truncateTriggerCreateCommand);
629
630 /* add the pg_dist_shard{,placement} entries */
631 List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
632 List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
633
634 metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
635 shardCreateCommandList);
636 }
637
638 return metadataSnapshotCommandList;
639 }
640
641
642 /*
643 * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to
644 * create the given distributed table on a worker. The list includes setting up any
645 * sequences, setting the owner of the table, inserting table and shard metadata,
646 * setting the truncate trigger and foreign key constraints.
647 */
648 static List *
GetDistributedTableDDLEvents(Oid relationId)649 GetDistributedTableDDLEvents(Oid relationId)
650 {
651 CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
652
653 List *commandList = NIL;
654 IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
655
656 /* if the table is owned by an extension we only propagate pg_dist_* records */
657 bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
658 if (!tableOwnedByExtension)
659 {
660 /* commands to create sequences */
661 List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
662 commandList = list_concat(commandList, sequenceDDLCommands);
663
664 /*
665 * Commands to create the table, these commands are TableDDLCommands so lets
666 * materialize to the non-sharded version
667 */
668 List *tableDDLCommands = GetFullTableCreationCommands(relationId,
669 includeSequenceDefaults);
670 TableDDLCommand *tableDDLCommand = NULL;
671 foreach_ptr(tableDDLCommand, tableDDLCommands)
672 {
673 Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
674 commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
675 }
676
677 /* command to associate sequences with table */
678 List *sequenceDependencyCommandList = SequenceDependencyCommandList(
679 relationId);
680 commandList = list_concat(commandList, sequenceDependencyCommandList);
681 }
682
683 /* command to insert pg_dist_partition entry */
684 char *metadataCommand = DistributionCreateCommand(cacheEntry);
685 commandList = lappend(commandList, metadataCommand);
686
687 /* commands to create the truncate trigger of the table */
688 char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
689 commandList = lappend(commandList, truncateTriggerCreateCommand);
690
691 /* commands to insert pg_dist_shard & pg_dist_placement entries */
692 List *shardIntervalList = LoadShardIntervalList(relationId);
693 List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
694 commandList = list_concat(commandList, shardMetadataInsertCommandList);
695
696 if (!tableOwnedByExtension)
697 {
698 /* commands to create foreign key constraints */
699 List *foreignConstraintCommands =
700 GetReferencingForeignConstaintCommands(relationId);
701 commandList = list_concat(commandList, foreignConstraintCommands);
702
703 /* commands to create partitioning hierarchy */
704 if (PartitionTable(relationId))
705 {
706 char *alterTableAttachPartitionCommands =
707 GenerateAlterTableAttachPartitionCommand(relationId);
708 commandList = lappend(commandList, alterTableAttachPartitionCommands);
709 }
710 }
711
712 return commandList;
713 }
714
715
716 /*
717 * MetadataDropCommands returns list of queries that are required to
718 * drop all the metadata of the node that are related to clustered tables.
719 * The drop metadata snapshot commands includes the following queries:
720 *
721 * (i) Query to disable DDL propagation (necessary for (ii)
722 * (ii) Queries that DETACH all partitions of distributed tables
723 * (iii) Queries that delete all the rows from pg_dist_node table
724 * (iv) Queries that drop the clustered tables and remove its references from
725 * the pg_dist_partition. Note that distributed relation ids are gathered
726 * from the worker itself to prevent dropping any non-distributed tables
727 * with the same name.
728 * (v) Queries that delete all the rows from pg_dist_shard table referenced by (iv)
729 * (vi) Queries that delete all the rows from pg_dist_placement table
730 * referenced by (v)
731 */
732 List *
MetadataDropCommands(void)733 MetadataDropCommands(void)
734 {
735 List *dropSnapshotCommandList = NIL;
736 List *detachPartitionCommandList = DetachPartitionCommandList();
737
738 dropSnapshotCommandList = list_concat(dropSnapshotCommandList,
739 detachPartitionCommandList);
740
741 dropSnapshotCommandList = lappend(dropSnapshotCommandList,
742 REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
743
744 dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
745
746 return dropSnapshotCommandList;
747 }
748
749
750 /*
751 * NodeListInsertCommand generates a single multi-row INSERT command that can be
752 * executed to insert the nodes that are in workerNodeList to pg_dist_node table.
753 */
754 char *
NodeListInsertCommand(List * workerNodeList)755 NodeListInsertCommand(List *workerNodeList)
756 {
757 StringInfo nodeListInsertCommand = makeStringInfo();
758 int workerCount = list_length(workerNodeList);
759 int processedWorkerNodeCount = 0;
760 Oid primaryRole = PrimaryNodeRoleId();
761
762 /* if there are no workers, return NULL */
763 if (workerCount == 0)
764 {
765 return nodeListInsertCommand->data;
766 }
767
768 if (primaryRole == InvalidOid)
769 {
770 ereport(ERROR, (errmsg("bad metadata, noderole does not exist"),
771 errdetail("you should never see this, please submit "
772 "a bug report"),
773 errhint("run ALTER EXTENSION citus UPDATE and try again")));
774 }
775
776 /* generate the query without any values yet */
777 appendStringInfo(nodeListInsertCommand,
778 "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
779 "noderack, hasmetadata, metadatasynced, isactive, noderole, "
780 "nodecluster, shouldhaveshards) VALUES ");
781
782 /* iterate over the worker nodes, add the values */
783 WorkerNode *workerNode = NULL;
784 foreach_ptr(workerNode, workerNodeList)
785 {
786 char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
787 char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
788 char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
789 char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE";
790
791 Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole);
792 Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum);
793 char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
794
795 appendStringInfo(nodeListInsertCommand,
796 "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)",
797 workerNode->nodeId,
798 workerNode->groupId,
799 quote_literal_cstr(workerNode->workerName),
800 workerNode->workerPort,
801 quote_literal_cstr(workerNode->workerRack),
802 hasMetadataString,
803 metadataSyncedString,
804 isActiveString,
805 nodeRoleString,
806 quote_literal_cstr(workerNode->nodeCluster),
807 shouldHaveShards);
808
809 processedWorkerNodeCount++;
810 if (processedWorkerNodeCount != workerCount)
811 {
812 appendStringInfo(nodeListInsertCommand, ",");
813 }
814 }
815
816 return nodeListInsertCommand->data;
817 }
818
819
820 /*
821 * DistributionCreateCommands generates a commands that can be
822 * executed to replicate the metadata for a distributed table.
823 */
824 char *
DistributionCreateCommand(CitusTableCacheEntry * cacheEntry)825 DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
826 {
827 StringInfo insertDistributionCommand = makeStringInfo();
828 Oid relationId = cacheEntry->relationId;
829 char distributionMethod = cacheEntry->partitionMethod;
830 char *partitionKeyString = cacheEntry->partitionKeyString;
831 char *qualifiedRelationName =
832 generate_qualified_relation_name(relationId);
833 uint32 colocationId = cacheEntry->colocationId;
834 char replicationModel = cacheEntry->replicationModel;
835 StringInfo tablePartitionKeyNameString = makeStringInfo();
836
837 if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
838 {
839 appendStringInfo(tablePartitionKeyNameString, "NULL");
840 }
841 else
842 {
843 char *partitionKeyColumnName =
844 ColumnToColumnName(relationId, partitionKeyString);
845 appendStringInfo(tablePartitionKeyNameString, "%s",
846 quote_literal_cstr(partitionKeyColumnName));
847 }
848
849 appendStringInfo(insertDistributionCommand,
850 "SELECT citus_internal_add_partition_metadata "
851 "(%s::regclass, '%c', %s, %d, '%c')",
852 quote_literal_cstr(qualifiedRelationName),
853 distributionMethod,
854 tablePartitionKeyNameString->data,
855 colocationId,
856 replicationModel);
857
858 return insertDistributionCommand->data;
859 }
860
861
862 /*
863 * DistributionDeleteCommand generates a command that can be executed
864 * to drop a distributed table and its metadata on a remote node.
865 */
866 char *
DistributionDeleteCommand(const char * schemaName,const char * tableName)867 DistributionDeleteCommand(const char *schemaName, const char *tableName)
868 {
869 StringInfo deleteDistributionCommand = makeStringInfo();
870
871 char *distributedRelationName = quote_qualified_identifier(schemaName, tableName);
872
873 appendStringInfo(deleteDistributionCommand,
874 "SELECT worker_drop_distributed_table(%s)",
875 quote_literal_cstr(distributedRelationName));
876
877 return deleteDistributionCommand->data;
878 }
879
880
881 /*
882 * TableOwnerResetCommand generates a commands that can be executed
883 * to reset the table owner.
884 */
885 char *
TableOwnerResetCommand(Oid relationId)886 TableOwnerResetCommand(Oid relationId)
887 {
888 StringInfo ownerResetCommand = makeStringInfo();
889 char *qualifiedRelationName = generate_qualified_relation_name(relationId);
890 char *tableOwnerName = TableOwner(relationId);
891
892 appendStringInfo(ownerResetCommand,
893 "ALTER TABLE %s OWNER TO %s",
894 qualifiedRelationName,
895 quote_identifier(tableOwnerName));
896
897 return ownerResetCommand->data;
898 }
899
900
901 /*
902 * ShardListInsertCommand generates a single command that can be
903 * executed to replicate shard and shard placement metadata for the
904 * given shard intervals. The function assumes that each shard has a
905 * single placement, and asserts this information.
906 */
907 List *
ShardListInsertCommand(List * shardIntervalList)908 ShardListInsertCommand(List *shardIntervalList)
909 {
910 List *commandList = NIL;
911 int shardCount = list_length(shardIntervalList);
912
913 /* if there are no shards, return empty list */
914 if (shardCount == 0)
915 {
916 return commandList;
917 }
918
919 /* add placements to insertPlacementCommand */
920 StringInfo insertPlacementCommand = makeStringInfo();
921 appendStringInfo(insertPlacementCommand,
922 "WITH placement_data(shardid, shardstate, "
923 "shardlength, groupid, placementid) AS (VALUES ");
924
925 ShardInterval *shardInterval = NULL;
926 foreach_ptr(shardInterval, shardIntervalList)
927 {
928 uint64 shardId = shardInterval->shardId;
929 List *shardPlacementList = ActiveShardPlacementList(shardId);
930
931 ShardPlacement *placement = NULL;
932 foreach_ptr(placement, shardPlacementList)
933 {
934 appendStringInfo(insertPlacementCommand,
935 "(%ld, %d, %ld, %d, %ld)",
936 shardId,
937 placement->shardState,
938 placement->shardLength,
939 placement->groupId,
940 placement->placementId);
941
942 if (!(llast(shardPlacementList) == placement &&
943 llast(shardIntervalList) == shardInterval))
944 {
945 /*
946 * As long as this is not the last placement of the last shard,
947 * append the comma.
948 */
949 appendStringInfo(insertPlacementCommand, ", ");
950 }
951 }
952 }
953
954 appendStringInfo(insertPlacementCommand, ") ");
955
956 appendStringInfo(insertPlacementCommand,
957 "SELECT citus_internal_add_placement_metadata("
958 "shardid, shardstate, shardlength, groupid, placementid) "
959 "FROM placement_data;");
960
961 /* now add shards to insertShardCommand */
962 StringInfo insertShardCommand = makeStringInfo();
963 appendStringInfo(insertShardCommand,
964 "WITH shard_data(relationname, shardid, storagetype, "
965 "shardminvalue, shardmaxvalue) AS (VALUES ");
966
967 foreach_ptr(shardInterval, shardIntervalList)
968 {
969 uint64 shardId = shardInterval->shardId;
970 Oid distributedRelationId = shardInterval->relationId;
971 char *qualifiedRelationName = generate_qualified_relation_name(
972 distributedRelationId);
973 StringInfo minHashToken = makeStringInfo();
974 StringInfo maxHashToken = makeStringInfo();
975
976 if (shardInterval->minValueExists)
977 {
978 appendStringInfo(minHashToken, "'%d'", DatumGetInt32(
979 shardInterval->minValue));
980 }
981 else
982 {
983 appendStringInfo(minHashToken, "NULL");
984 }
985
986 if (shardInterval->maxValueExists)
987 {
988 appendStringInfo(maxHashToken, "'%d'", DatumGetInt32(
989 shardInterval->maxValue));
990 }
991 else
992 {
993 appendStringInfo(maxHashToken, "NULL");
994 }
995
996 appendStringInfo(insertShardCommand,
997 "(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
998 quote_literal_cstr(qualifiedRelationName),
999 shardId,
1000 shardInterval->storageType,
1001 minHashToken->data,
1002 maxHashToken->data);
1003
1004 if (llast(shardIntervalList) != shardInterval)
1005 {
1006 appendStringInfo(insertShardCommand, ", ");
1007 }
1008 }
1009
1010 appendStringInfo(insertShardCommand, ") ");
1011
1012 appendStringInfo(insertShardCommand,
1013 "SELECT citus_internal_add_shard_metadata(relationname, shardid, "
1014 "storagetype, shardminvalue, shardmaxvalue) "
1015 "FROM shard_data;");
1016
1017 /* first insert shards, than the placements */
1018 commandList = lappend(commandList, insertShardCommand->data);
1019 commandList = lappend(commandList, insertPlacementCommand->data);
1020
1021 return commandList;
1022 }
1023
1024
1025 /*
1026 * NodeDeleteCommand generate a command that can be
1027 * executed to delete the metadata for a worker node.
1028 */
1029 char *
NodeDeleteCommand(uint32 nodeId)1030 NodeDeleteCommand(uint32 nodeId)
1031 {
1032 StringInfo nodeDeleteCommand = makeStringInfo();
1033
1034 appendStringInfo(nodeDeleteCommand,
1035 "DELETE FROM pg_dist_node "
1036 "WHERE nodeid = %u", nodeId);
1037
1038 return nodeDeleteCommand->data;
1039 }
1040
1041
1042 /*
1043 * NodeStateUpdateCommand generates a command that can be executed to update
1044 * isactive column of a node in pg_dist_node table.
1045 */
1046 char *
NodeStateUpdateCommand(uint32 nodeId,bool isActive)1047 NodeStateUpdateCommand(uint32 nodeId, bool isActive)
1048 {
1049 StringInfo nodeStateUpdateCommand = makeStringInfo();
1050 char *isActiveString = isActive ? "TRUE" : "FALSE";
1051
1052 appendStringInfo(nodeStateUpdateCommand,
1053 "UPDATE pg_dist_node SET isactive = %s "
1054 "WHERE nodeid = %u", isActiveString, nodeId);
1055
1056 return nodeStateUpdateCommand->data;
1057 }
1058
1059
1060 /*
1061 * ShouldHaveShardsUpdateCommand generates a command that can be executed to
1062 * update the shouldhaveshards column of a node in pg_dist_node table.
1063 */
1064 char *
ShouldHaveShardsUpdateCommand(uint32 nodeId,bool shouldHaveShards)1065 ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards)
1066 {
1067 StringInfo nodeStateUpdateCommand = makeStringInfo();
1068 char *shouldHaveShardsString = shouldHaveShards ? "TRUE" : "FALSE";
1069
1070 appendStringInfo(nodeStateUpdateCommand,
1071 "UPDATE pg_catalog.pg_dist_node SET shouldhaveshards = %s "
1072 "WHERE nodeid = %u", shouldHaveShardsString, nodeId);
1073
1074 return nodeStateUpdateCommand->data;
1075 }
1076
1077
1078 /*
1079 * ColocationIdUpdateCommand creates the SQL command to change the colocationId
1080 * of the table with the given name to the given colocationId in pg_dist_partition
1081 * table.
1082 */
1083 char *
ColocationIdUpdateCommand(Oid relationId,uint32 colocationId)1084 ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
1085 {
1086 StringInfo command = makeStringInfo();
1087 char *qualifiedRelationName = generate_qualified_relation_name(relationId);
1088 appendStringInfo(command,
1089 "SELECT citus_internal_update_relation_colocation(%s::regclass, %d)",
1090 quote_literal_cstr(qualifiedRelationName), colocationId);
1091
1092 return command->data;
1093 }
1094
1095
1096 /*
1097 * PlacementUpsertCommand creates a SQL command for upserting a pg_dist_placment
1098 * entry with the given properties. In the case of a conflict on placementId, the command
1099 * updates all properties (excluding the placementId) with the given ones.
1100 */
1101 char *
PlacementUpsertCommand(uint64 shardId,uint64 placementId,int shardState,uint64 shardLength,int32 groupId)1102 PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
1103 uint64 shardLength, int32 groupId)
1104 {
1105 StringInfo command = makeStringInfo();
1106
1107 appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardState, shardLength,
1108 groupId, placementId);
1109
1110 return command->data;
1111 }
1112
1113
1114 /*
1115 * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
1116 * of a worker and returns the command in a string.
1117 */
1118 static char *
LocalGroupIdUpdateCommand(int32 groupId)1119 LocalGroupIdUpdateCommand(int32 groupId)
1120 {
1121 StringInfo updateCommand = makeStringInfo();
1122
1123 appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d",
1124 groupId);
1125
1126 return updateCommand->data;
1127 }
1128
1129
1130 /*
1131 * SequenceDDLCommandsForTable returns a list of commands which create sequences (and
1132 * their schemas) to run on workers before creating the relation. The sequence creation
1133 * commands are wrapped with a `worker_apply_sequence_command` call, which sets the
1134 * sequence space uniquely for each worker. Notice that this function is relevant only
1135 * during metadata propagation to workers and adds nothing to the list of sequence
1136 * commands if none of the workers is marked as receiving metadata changes.
1137 */
1138 List *
SequenceDDLCommandsForTable(Oid relationId)1139 SequenceDDLCommandsForTable(Oid relationId)
1140 {
1141 List *sequenceDDLList = NIL;
1142
1143 List *attnumList = NIL;
1144 List *dependentSequenceList = NIL;
1145 GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
1146
1147 char *ownerName = TableOwner(relationId);
1148
1149 Oid sequenceOid = InvalidOid;
1150 foreach_oid(sequenceOid, dependentSequenceList)
1151 {
1152 char *sequenceDef = pg_get_sequencedef_string(sequenceOid);
1153 char *escapedSequenceDef = quote_literal_cstr(sequenceDef);
1154 StringInfo wrappedSequenceDef = makeStringInfo();
1155 StringInfo sequenceGrantStmt = makeStringInfo();
1156 char *sequenceName = generate_qualified_relation_name(sequenceOid);
1157 Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceOid);
1158 Oid sequenceTypeOid = sequenceData->seqtypid;
1159 char *typeName = format_type_be(sequenceTypeOid);
1160
1161 /* create schema if needed */
1162 appendStringInfo(wrappedSequenceDef,
1163 WORKER_APPLY_SEQUENCE_COMMAND,
1164 escapedSequenceDef,
1165 quote_literal_cstr(typeName));
1166
1167 appendStringInfo(sequenceGrantStmt,
1168 "ALTER SEQUENCE %s OWNER TO %s", sequenceName,
1169 quote_identifier(ownerName));
1170
1171 sequenceDDLList = lappend(sequenceDDLList, wrappedSequenceDef->data);
1172 sequenceDDLList = lappend(sequenceDDLList, sequenceGrantStmt->data);
1173 }
1174
1175 return sequenceDDLList;
1176 }
1177
1178
1179 /*
1180 * GetAttributeTypeOid returns the OID of the type of the attribute of
1181 * provided relationId that has the provided attnum
1182 */
1183 Oid
GetAttributeTypeOid(Oid relationId,AttrNumber attnum)1184 GetAttributeTypeOid(Oid relationId, AttrNumber attnum)
1185 {
1186 Oid resultOid = InvalidOid;
1187
1188 ScanKeyData key[2];
1189
1190 /* Grab an appropriate lock on the pg_attribute relation */
1191 Relation attrel = table_open(AttributeRelationId, AccessShareLock);
1192
1193 /* Use the index to scan only system attributes of the target relation */
1194 ScanKeyInit(&key[0],
1195 Anum_pg_attribute_attrelid,
1196 BTEqualStrategyNumber, F_OIDEQ,
1197 ObjectIdGetDatum(relationId));
1198 ScanKeyInit(&key[1],
1199 Anum_pg_attribute_attnum,
1200 BTLessEqualStrategyNumber, F_INT2LE,
1201 Int16GetDatum(attnum));
1202
1203 SysScanDesc scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true, NULL, 2,
1204 key);
1205
1206 HeapTuple attributeTuple;
1207 while (HeapTupleIsValid(attributeTuple = systable_getnext(scan)))
1208 {
1209 Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
1210 resultOid = att->atttypid;
1211 }
1212
1213 systable_endscan(scan);
1214 table_close(attrel, AccessShareLock);
1215
1216 return resultOid;
1217 }
1218
1219
1220 /*
1221 * GetDependentSequencesWithRelation appends the attnum and id of sequences that
1222 * have direct (owned sequences) or indirect dependency with the given relationId,
1223 * to the lists passed as NIL initially.
1224 * For both cases, we use the intermediate AttrDefault object from pg_depend.
1225 * If attnum is specified, we only return the sequences related to that
1226 * attribute of the relationId.
1227 */
1228 void
GetDependentSequencesWithRelation(Oid relationId,List ** attnumList,List ** dependentSequenceList,AttrNumber attnum)1229 GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
1230 List **dependentSequenceList, AttrNumber attnum)
1231 {
1232 Assert(*attnumList == NIL && *dependentSequenceList == NIL);
1233
1234 List *attrdefResult = NIL;
1235 List *attrdefAttnumResult = NIL;
1236 ScanKeyData key[3];
1237 HeapTuple tup;
1238
1239 Relation depRel = table_open(DependRelationId, AccessShareLock);
1240
1241 ScanKeyInit(&key[0],
1242 Anum_pg_depend_refclassid,
1243 BTEqualStrategyNumber, F_OIDEQ,
1244 ObjectIdGetDatum(RelationRelationId));
1245 ScanKeyInit(&key[1],
1246 Anum_pg_depend_refobjid,
1247 BTEqualStrategyNumber, F_OIDEQ,
1248 ObjectIdGetDatum(relationId));
1249 if (attnum)
1250 {
1251 ScanKeyInit(&key[2],
1252 Anum_pg_depend_refobjsubid,
1253 BTEqualStrategyNumber, F_INT4EQ,
1254 Int32GetDatum(attnum));
1255 }
1256
1257 SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
1258 NULL, attnum ? 3 : 2, key);
1259
1260 while (HeapTupleIsValid(tup = systable_getnext(scan)))
1261 {
1262 Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
1263
1264 if (deprec->classid == AttrDefaultRelationId &&
1265 deprec->objsubid == 0 &&
1266 deprec->refobjsubid != 0 &&
1267 deprec->deptype == DEPENDENCY_AUTO)
1268 {
1269 attrdefResult = lappend_oid(attrdefResult, deprec->objid);
1270 attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid);
1271 }
1272 }
1273
1274 systable_endscan(scan);
1275
1276 table_close(depRel, AccessShareLock);
1277
1278 ListCell *attrdefOidCell = NULL;
1279 ListCell *attrdefAttnumCell = NULL;
1280 forboth(attrdefOidCell, attrdefResult, attrdefAttnumCell, attrdefAttnumResult)
1281 {
1282 Oid attrdefOid = lfirst_oid(attrdefOidCell);
1283 AttrNumber attrdefAttnum = lfirst_int(attrdefAttnumCell);
1284
1285 List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid);
1286
1287 /* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
1288 if (list_length(sequencesFromAttrDef) > 1)
1289 {
1290 ereport(ERROR, (errmsg(
1291 "More than one sequence in a column default"
1292 " is not supported for distribution "
1293 "or for adding local tables to metadata")));
1294 }
1295
1296 if (list_length(sequencesFromAttrDef) == 1)
1297 {
1298 *dependentSequenceList = list_concat(*dependentSequenceList,
1299 sequencesFromAttrDef);
1300 *attnumList = lappend_int(*attnumList, attrdefAttnum);
1301 }
1302 }
1303 }
1304
1305
1306 /*
1307 * GetSequencesFromAttrDef returns a list of sequence OIDs that have
1308 * dependency with the given attrdefOid in pg_depend
1309 */
1310 List *
GetSequencesFromAttrDef(Oid attrdefOid)1311 GetSequencesFromAttrDef(Oid attrdefOid)
1312 {
1313 List *sequencesResult = NIL;
1314 ScanKeyData key[2];
1315 HeapTuple tup;
1316
1317 Relation depRel = table_open(DependRelationId, AccessShareLock);
1318
1319 ScanKeyInit(&key[0],
1320 Anum_pg_depend_classid,
1321 BTEqualStrategyNumber, F_OIDEQ,
1322 ObjectIdGetDatum(AttrDefaultRelationId));
1323 ScanKeyInit(&key[1],
1324 Anum_pg_depend_objid,
1325 BTEqualStrategyNumber, F_OIDEQ,
1326 ObjectIdGetDatum(attrdefOid));
1327
1328 SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true,
1329 NULL, 2, key);
1330
1331 while (HeapTupleIsValid(tup = systable_getnext(scan)))
1332 {
1333 Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
1334
1335 if (deprec->refclassid == RelationRelationId &&
1336 deprec->deptype == DEPENDENCY_NORMAL &&
1337 get_rel_relkind(deprec->refobjid) == RELKIND_SEQUENCE)
1338 {
1339 sequencesResult = lappend_oid(sequencesResult, deprec->refobjid);
1340 }
1341 }
1342
1343 systable_endscan(scan);
1344
1345 table_close(depRel, AccessShareLock);
1346
1347 return sequencesResult;
1348 }
1349
1350
1351 /*
1352 * SequenceDependencyCommandList generates commands to record the dependency
1353 * of sequences on tables on the worker. This dependency does not exist by
1354 * default since the sequences and table are created separately, but it is
1355 * necessary to ensure that the sequence is dropped when the table is
1356 * dropped.
1357 */
1358 static List *
SequenceDependencyCommandList(Oid relationId)1359 SequenceDependencyCommandList(Oid relationId)
1360 {
1361 List *sequenceCommandList = NIL;
1362 List *columnNameList = NIL;
1363 List *sequenceIdList = NIL;
1364
1365 ExtractDefaultColumnsAndOwnedSequences(relationId, &columnNameList, &sequenceIdList);
1366
1367 ListCell *columnNameCell = NULL;
1368 ListCell *sequenceIdCell = NULL;
1369
1370 forboth(columnNameCell, columnNameList, sequenceIdCell, sequenceIdList)
1371 {
1372 char *columnName = lfirst(columnNameCell);
1373 Oid sequenceId = lfirst_oid(sequenceIdCell);
1374
1375 if (!OidIsValid(sequenceId))
1376 {
1377 /*
1378 * ExtractDefaultColumnsAndOwnedSequences returns entries for all columns,
1379 * but with 0 sequence ID unless there is default nextval(..).
1380 */
1381 continue;
1382 }
1383
1384 char *sequenceDependencyCommand =
1385 CreateSequenceDependencyCommand(relationId, sequenceId, columnName);
1386
1387 sequenceCommandList = lappend(sequenceCommandList,
1388 sequenceDependencyCommand);
1389 }
1390
1391 return sequenceCommandList;
1392 }
1393
1394
1395 /*
1396 * CreateSequenceDependencyCommand generates a query string for calling
1397 * worker_record_sequence_dependency on the worker to recreate a sequence->table
1398 * dependency.
1399 */
1400 static char *
CreateSequenceDependencyCommand(Oid relationId,Oid sequenceId,char * columnName)1401 CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, char *columnName)
1402 {
1403 char *relationName = generate_qualified_relation_name(relationId);
1404 char *sequenceName = generate_qualified_relation_name(sequenceId);
1405
1406 StringInfo sequenceDependencyCommand = makeStringInfo();
1407
1408 appendStringInfo(sequenceDependencyCommand,
1409 "SELECT pg_catalog.worker_record_sequence_dependency"
1410 "(%s::regclass,%s::regclass,%s)",
1411 quote_literal_cstr(sequenceName),
1412 quote_literal_cstr(relationName),
1413 quote_literal_cstr(columnName));
1414
1415 return sequenceDependencyCommand->data;
1416 }
1417
1418
1419 /*
1420 * worker_record_sequence_dependency records the fact that the sequence depends on
1421 * the table in pg_depend, such that it will be automatically dropped.
1422 */
1423 Datum
worker_record_sequence_dependency(PG_FUNCTION_ARGS)1424 worker_record_sequence_dependency(PG_FUNCTION_ARGS)
1425 {
1426 Oid sequenceOid = PG_GETARG_OID(0);
1427 Oid relationOid = PG_GETARG_OID(1);
1428 Name columnName = PG_GETARG_NAME(2);
1429 const char *columnNameStr = NameStr(*columnName);
1430
1431 /* lookup column definition */
1432 HeapTuple columnTuple = SearchSysCacheAttName(relationOid, columnNameStr);
1433 if (!HeapTupleIsValid(columnTuple))
1434 {
1435 ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
1436 errmsg("column \"%s\" does not exist",
1437 columnNameStr)));
1438 }
1439
1440 Form_pg_attribute columnForm = (Form_pg_attribute) GETSTRUCT(columnTuple);
1441 if (columnForm->attnum <= 0)
1442 {
1443 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1444 errmsg("cannot create dependency on system column \"%s\"",
1445 columnNameStr)));
1446 }
1447
1448 ObjectAddress sequenceAddr = {
1449 .classId = RelationRelationId,
1450 .objectId = sequenceOid,
1451 .objectSubId = 0
1452 };
1453 ObjectAddress relationAddr = {
1454 .classId = RelationRelationId,
1455 .objectId = relationOid,
1456 .objectSubId = columnForm->attnum
1457 };
1458
1459
1460 EnsureTableOwner(sequenceOid);
1461 EnsureTableOwner(relationOid);
1462
1463 /* dependency from sequence to table */
1464 recordDependencyOn(&sequenceAddr, &relationAddr, DEPENDENCY_AUTO);
1465
1466 ReleaseSysCache(columnTuple);
1467
1468 PG_RETURN_VOID();
1469 }
1470
1471
1472 /*
1473 * CreateSchemaDDLCommand returns a "CREATE SCHEMA..." SQL string for creating the given
1474 * schema if not exists and with proper authorization.
1475 */
1476 char *
CreateSchemaDDLCommand(Oid schemaId)1477 CreateSchemaDDLCommand(Oid schemaId)
1478 {
1479 char *schemaName = get_namespace_name(schemaId);
1480
1481 StringInfo schemaNameDef = makeStringInfo();
1482 const char *quotedSchemaName = quote_identifier(schemaName);
1483 const char *ownerName = quote_identifier(SchemaOwnerName(schemaId));
1484 appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, quotedSchemaName, ownerName);
1485
1486 return schemaNameDef->data;
1487 }
1488
1489
1490 /*
1491 * GrantOnSchemaDDLCommands creates a list of ddl command for replicating the permissions
1492 * of roles on schemas.
1493 */
1494 List *
GrantOnSchemaDDLCommands(Oid schemaOid)1495 GrantOnSchemaDDLCommands(Oid schemaOid)
1496 {
1497 HeapTuple schemaTuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(schemaOid));
1498 bool isNull = true;
1499 Datum aclDatum = SysCacheGetAttr(NAMESPACEOID, schemaTuple, Anum_pg_namespace_nspacl,
1500 &isNull);
1501 if (isNull)
1502 {
1503 ReleaseSysCache(schemaTuple);
1504 return NIL;
1505 }
1506 Acl *acl = DatumGetAclPCopy(aclDatum);
1507 AclItem *aclDat = ACL_DAT(acl);
1508 int aclNum = ACL_NUM(acl);
1509 List *commands = NIL;
1510
1511 ReleaseSysCache(schemaTuple);
1512
1513 for (int i = 0; i < aclNum; i++)
1514 {
1515 commands = list_concat(commands,
1516 GenerateGrantOnSchemaQueriesFromAclItem(
1517 schemaOid,
1518 &aclDat[i]));
1519 }
1520
1521 return commands;
1522 }
1523
1524
1525 /*
1526 * GenerateGrantOnSchemaQueryFromACL generates a query string for replicating a users permissions
1527 * on a schema.
1528 */
1529 List *
GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid,AclItem * aclItem)1530 GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem)
1531 {
1532 AclMode permissions = ACLITEM_GET_PRIVS(*aclItem) & ACL_ALL_RIGHTS_SCHEMA;
1533 AclMode grants = ACLITEM_GET_GOPTIONS(*aclItem) & ACL_ALL_RIGHTS_SCHEMA;
1534
1535 /*
1536 * seems unlikely but we check if there is a grant option in the list without the actual permission
1537 */
1538 Assert(!(grants & ACL_USAGE) || (permissions & ACL_USAGE));
1539 Assert(!(grants & ACL_CREATE) || (permissions & ACL_CREATE));
1540 Oid granteeOid = aclItem->ai_grantee;
1541 List *queries = NIL;
1542
1543 queries = lappend(queries, GenerateSetRoleQuery(aclItem->ai_grantor));
1544
1545 if (permissions & ACL_USAGE)
1546 {
1547 char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights(
1548 granteeOid, schemaOid, "USAGE", grants &
1549 ACL_USAGE));
1550 queries = lappend(queries, query);
1551 }
1552 if (permissions & ACL_CREATE)
1553 {
1554 char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights(
1555 granteeOid, schemaOid, "CREATE", grants &
1556 ACL_CREATE));
1557 queries = lappend(queries, query);
1558 }
1559
1560 queries = lappend(queries, "RESET ROLE");
1561
1562 return queries;
1563 }
1564
1565
1566 GrantStmt *
GenerateGrantOnSchemaStmtForRights(Oid roleOid,Oid schemaOid,char * permission,bool withGrantOption)1567 GenerateGrantOnSchemaStmtForRights(Oid roleOid,
1568 Oid schemaOid,
1569 char *permission,
1570 bool withGrantOption)
1571 {
1572 AccessPriv *accessPriv = makeNode(AccessPriv);
1573 accessPriv->priv_name = permission;
1574 accessPriv->cols = NULL;
1575
1576 RoleSpec *roleSpec = makeNode(RoleSpec);
1577 roleSpec->roletype = OidIsValid(roleOid) ? ROLESPEC_CSTRING : ROLESPEC_PUBLIC;
1578 roleSpec->rolename = OidIsValid(roleOid) ? GetUserNameFromId(roleOid, false) : NULL;
1579 roleSpec->location = -1;
1580
1581 GrantStmt *stmt = makeNode(GrantStmt);
1582 stmt->is_grant = true;
1583 stmt->targtype = ACL_TARGET_OBJECT;
1584 stmt->objtype = OBJECT_SCHEMA;
1585 stmt->objects = list_make1(makeString(get_namespace_name(schemaOid)));
1586 stmt->privileges = list_make1(accessPriv);
1587 stmt->grantees = list_make1(roleSpec);
1588 stmt->grant_option = withGrantOption;
1589 return stmt;
1590 }
1591
1592
1593 static char *
GenerateSetRoleQuery(Oid roleOid)1594 GenerateSetRoleQuery(Oid roleOid)
1595 {
1596 StringInfo buf = makeStringInfo();
1597 appendStringInfo(buf, "SET ROLE %s", quote_identifier(GetUserNameFromId(roleOid,
1598 false)));
1599 return buf->data;
1600 }
1601
1602
1603 /*
1604 * TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger
1605 * function, which creates the truncate trigger on the worker.
1606 */
1607 static char *
TruncateTriggerCreateCommand(Oid relationId)1608 TruncateTriggerCreateCommand(Oid relationId)
1609 {
1610 StringInfo triggerCreateCommand = makeStringInfo();
1611 char *tableName = generate_qualified_relation_name(relationId);
1612
1613 appendStringInfo(triggerCreateCommand,
1614 "SELECT worker_create_truncate_trigger(%s)",
1615 quote_literal_cstr(tableName));
1616
1617 return triggerCreateCommand->data;
1618 }
1619
1620
1621 /*
1622 * SchemaOwnerName returns the name of the owner of the specified schema.
1623 */
1624 static char *
SchemaOwnerName(Oid objectId)1625 SchemaOwnerName(Oid objectId)
1626 {
1627 Oid ownerId = InvalidOid;
1628
1629 HeapTuple tuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(objectId));
1630 if (HeapTupleIsValid(tuple))
1631 {
1632 ownerId = ((Form_pg_namespace) GETSTRUCT(tuple))->nspowner;
1633 }
1634 else
1635 {
1636 ownerId = GetUserId();
1637 }
1638
1639 char *ownerName = GetUserNameFromId(ownerId, false);
1640
1641 ReleaseSysCache(tuple);
1642
1643 return ownerName;
1644 }
1645
1646
1647 /*
1648 * HasMetadataWorkers returns true if any of the workers in the cluster has its
1649 * hasmetadata column set to true, which happens when start_metadata_sync_to_node
1650 * command is run.
1651 */
1652 static bool
HasMetadataWorkers(void)1653 HasMetadataWorkers(void)
1654 {
1655 List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
1656
1657 WorkerNode *workerNode = NULL;
1658 foreach_ptr(workerNode, workerNodeList)
1659 {
1660 if (workerNode->hasMetadata)
1661 {
1662 return true;
1663 }
1664 }
1665
1666 return false;
1667 }
1668
1669
1670 /*
1671 * CreateTableMetadataOnWorkers creates the list of commands needed to create the
1672 * given distributed table and sends these commands to all metadata workers i.e. workers
1673 * with hasmetadata=true. Before sending the commands, in order to prevent recursive
1674 * propagation, DDL propagation on workers are disabled with a
1675 * `SET citus.enable_ddl_propagation TO off;` command.
1676 */
1677 void
CreateTableMetadataOnWorkers(Oid relationId)1678 CreateTableMetadataOnWorkers(Oid relationId)
1679 {
1680 List *commandList = GetDistributedTableDDLEvents(relationId);
1681
1682 /* prevent recursive propagation */
1683 SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
1684
1685 /* send the commands one by one */
1686 const char *command = NULL;
1687 foreach_ptr(command, commandList)
1688 {
1689 SendCommandToWorkersWithMetadata(command);
1690 }
1691 }
1692
1693
1694 /*
1695 * DetachPartitionCommandList returns list of DETACH commands to detach partitions
1696 * of all distributed tables. This function is used for detaching partitions in MX
1697 * workers before DROPping distributed partitioned tables in them. Thus, we are
1698 * disabling DDL propagation to the beginning of the commands (we are also enabling
1699 * DDL propagation at the end of command list to swtich back to original state). As
1700 * an extra step, if there are no partitions to DETACH, this function simply returns
1701 * empty list to not disable/enable DDL propagation for nothing.
1702 */
1703 static List *
DetachPartitionCommandList(void)1704 DetachPartitionCommandList(void)
1705 {
1706 List *detachPartitionCommandList = NIL;
1707 List *distributedTableList = CitusTableList();
1708
1709 /* we iterate over all distributed partitioned tables and DETACH their partitions */
1710 CitusTableCacheEntry *cacheEntry = NULL;
1711 foreach_ptr(cacheEntry, distributedTableList)
1712 {
1713 if (!PartitionedTable(cacheEntry->relationId))
1714 {
1715 continue;
1716 }
1717
1718 List *partitionList = PartitionList(cacheEntry->relationId);
1719 Oid partitionRelationId = InvalidOid;
1720 foreach_oid(partitionRelationId, partitionList)
1721 {
1722 char *detachPartitionCommand =
1723 GenerateDetachPartitionCommand(partitionRelationId);
1724
1725 detachPartitionCommandList = lappend(detachPartitionCommandList,
1726 detachPartitionCommand);
1727 }
1728 }
1729
1730 if (list_length(detachPartitionCommandList) == 0)
1731 {
1732 return NIL;
1733 }
1734
1735 detachPartitionCommandList =
1736 lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList);
1737
1738 /*
1739 * We probably do not need this but as an extra precaution, we are enabling
1740 * DDL propagation to switch back to original state.
1741 */
1742 detachPartitionCommandList = lappend(detachPartitionCommandList,
1743 ENABLE_DDL_PROPAGATION);
1744
1745 return detachPartitionCommandList;
1746 }
1747
1748
1749 /*
1750 * SyncMetadataToNodes tries recreating the metadata snapshot in the
1751 * metadata workers that are out of sync. Returns the result of
1752 * synchronization.
1753 */
1754 static MetadataSyncResult
SyncMetadataToNodes(void)1755 SyncMetadataToNodes(void)
1756 {
1757 MetadataSyncResult result = METADATA_SYNC_SUCCESS;
1758
1759 if (!IsCoordinator())
1760 {
1761 return METADATA_SYNC_SUCCESS;
1762 }
1763
1764 /*
1765 * Request a RowExclusiveLock so we don't run concurrently with other
1766 * functions updating pg_dist_node, but allow concurrency with functions
1767 * which are just reading from pg_dist_node.
1768 */
1769 if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock))
1770 {
1771 return METADATA_SYNC_FAILED_LOCK;
1772 }
1773
1774 List *syncedWorkerList = NIL;
1775 List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
1776 WorkerNode *workerNode = NULL;
1777 foreach_ptr(workerNode, workerList)
1778 {
1779 if (workerNode->hasMetadata && !workerNode->metadataSynced)
1780 {
1781 bool raiseInterrupts = false;
1782 if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
1783 {
1784 ereport(WARNING, (errmsg("failed to sync metadata to %s:%d",
1785 workerNode->workerName,
1786 workerNode->workerPort)));
1787 result = METADATA_SYNC_FAILED_SYNC;
1788 }
1789 else
1790 {
1791 /* we add successfully synced nodes to set metadatasynced column later */
1792 syncedWorkerList = lappend(syncedWorkerList, workerNode);
1793 }
1794 }
1795 }
1796
1797 foreach_ptr(workerNode, syncedWorkerList)
1798 {
1799 SetWorkerColumnOptional(workerNode, Anum_pg_dist_node_metadatasynced,
1800 BoolGetDatum(true));
1801
1802 /* we fetch the same node again to check if it's synced or not */
1803 WorkerNode *nodeUpdated = FindWorkerNode(workerNode->workerName,
1804 workerNode->workerPort);
1805 if (!nodeUpdated->metadataSynced)
1806 {
1807 /* set the result to FAILED to trigger the sync again */
1808 result = METADATA_SYNC_FAILED_SYNC;
1809 }
1810 }
1811
1812 return result;
1813 }
1814
1815
1816 /*
1817 * SyncMetadataToNodesMain is the main function for syncing metadata to
1818 * MX nodes. It retries until success and then exits.
1819 */
1820 void
SyncMetadataToNodesMain(Datum main_arg)1821 SyncMetadataToNodesMain(Datum main_arg)
1822 {
1823 Oid databaseOid = DatumGetObjectId(main_arg);
1824
1825 /* extension owner is passed via bgw_extra */
1826 Oid extensionOwner = InvalidOid;
1827 memcpy_s(&extensionOwner, sizeof(extensionOwner),
1828 MyBgworkerEntry->bgw_extra, sizeof(Oid));
1829
1830 pqsignal(SIGTERM, MetadataSyncSigTermHandler);
1831 pqsignal(SIGALRM, MetadataSyncSigAlrmHandler);
1832 BackgroundWorkerUnblockSignals();
1833
1834 /* connect to database, after that we can actually access catalogs */
1835 BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
1836
1837 /* make worker recognizable in pg_stat_activity */
1838 pgstat_report_appname(METADATA_SYNC_APP_NAME);
1839
1840 bool syncedAllNodes = false;
1841
1842 while (!syncedAllNodes)
1843 {
1844 InvalidateMetadataSystemCache();
1845 StartTransactionCommand();
1846
1847 /*
1848 * Some functions in ruleutils.c, which we use to get the DDL for
1849 * metadata propagation, require an active snapshot.
1850 */
1851 PushActiveSnapshot(GetTransactionSnapshot());
1852
1853 if (!LockCitusExtension())
1854 {
1855 ereport(DEBUG1, (errmsg("could not lock the citus extension, "
1856 "skipping metadata sync")));
1857 }
1858 else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
1859 {
1860 UseCoordinatedTransaction();
1861 MetadataSyncResult result = SyncMetadataToNodes();
1862
1863 syncedAllNodes = (result == METADATA_SYNC_SUCCESS);
1864
1865 /* we use LISTEN/NOTIFY to wait for metadata syncing in tests */
1866 if (result != METADATA_SYNC_FAILED_LOCK)
1867 {
1868 Async_Notify(METADATA_SYNC_CHANNEL, NULL);
1869 }
1870 }
1871
1872 PopActiveSnapshot();
1873 CommitTransactionCommand();
1874 ProcessCompletedNotifies();
1875
1876 if (syncedAllNodes)
1877 {
1878 break;
1879 }
1880
1881 /*
1882 * If backend is cancelled (e.g. bacause of distributed deadlock),
1883 * CHECK_FOR_INTERRUPTS() will raise a cancellation error which will
1884 * result in exit(1).
1885 */
1886 CHECK_FOR_INTERRUPTS();
1887
1888 /*
1889 * SIGTERM is used for when maintenance daemon tries to clean-up
1890 * metadata sync daemons spawned by terminated maintenance daemons.
1891 */
1892 if (got_SIGTERM)
1893 {
1894 exit(0);
1895 }
1896
1897 /*
1898 * SIGALRM is used for testing purposes and it simulates an error in metadata
1899 * sync daemon.
1900 */
1901 if (got_SIGALRM)
1902 {
1903 elog(ERROR, "Error in metadata sync daemon");
1904 }
1905
1906 pg_usleep(MetadataSyncRetryInterval * 1000);
1907 }
1908 }
1909
1910
1911 /*
1912 * MetadataSyncSigTermHandler set a flag to request termination of metadata
1913 * sync daemon.
1914 */
1915 static void
MetadataSyncSigTermHandler(SIGNAL_ARGS)1916 MetadataSyncSigTermHandler(SIGNAL_ARGS)
1917 {
1918 int save_errno = errno;
1919
1920 got_SIGTERM = true;
1921 if (MyProc != NULL)
1922 {
1923 SetLatch(&MyProc->procLatch);
1924 }
1925
1926 errno = save_errno;
1927 }
1928
1929
1930 /*
1931 * MetadataSyncSigAlrmHandler set a flag to request error at metadata
1932 * sync daemon. This is used for testing purposes.
1933 */
1934 static void
MetadataSyncSigAlrmHandler(SIGNAL_ARGS)1935 MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
1936 {
1937 int save_errno = errno;
1938
1939 got_SIGALRM = true;
1940 if (MyProc != NULL)
1941 {
1942 SetLatch(&MyProc->procLatch);
1943 }
1944
1945 errno = save_errno;
1946 }
1947
1948
1949 /*
1950 * SpawnSyncMetadataToNodes starts a background worker which runs metadata
1951 * sync. On success it returns workers' handle. Otherwise it returns NULL.
1952 */
1953 BackgroundWorkerHandle *
SpawnSyncMetadataToNodes(Oid database,Oid extensionOwner)1954 SpawnSyncMetadataToNodes(Oid database, Oid extensionOwner)
1955 {
1956 BackgroundWorker worker;
1957 BackgroundWorkerHandle *handle = NULL;
1958
1959 /* Configure a worker. */
1960 memset(&worker, 0, sizeof(worker));
1961 SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
1962 "Citus Metadata Sync: %u/%u",
1963 database, extensionOwner);
1964 worker.bgw_flags =
1965 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
1966 worker.bgw_start_time = BgWorkerStart_ConsistentState;
1967
1968 /* don't restart, we manage restarts from maintenance daemon */
1969 worker.bgw_restart_time = BGW_NEVER_RESTART;
1970 strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
1971 strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
1972 "SyncMetadataToNodesMain");
1973 worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
1974 memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
1975 sizeof(Oid));
1976 worker.bgw_notify_pid = MyProcPid;
1977
1978 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
1979 {
1980 return NULL;
1981 }
1982
1983 pid_t pid;
1984 WaitForBackgroundWorkerStartup(handle, &pid);
1985
1986 return handle;
1987 }
1988
1989
1990 /*
1991 * SignalMetadataSyncDaemon signals metadata sync daemons belonging to
1992 * the given database.
1993 */
1994 void
SignalMetadataSyncDaemon(Oid database,int sig)1995 SignalMetadataSyncDaemon(Oid database, int sig)
1996 {
1997 int backendCount = pgstat_fetch_stat_numbackends();
1998 for (int backend = 1; backend <= backendCount; backend++)
1999 {
2000 LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend);
2001 if (!localBeEntry)
2002 {
2003 continue;
2004 }
2005
2006 PgBackendStatus *beStatus = &localBeEntry->backendStatus;
2007 if (beStatus->st_databaseid == database &&
2008 strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0)
2009 {
2010 kill(beStatus->st_procpid, sig);
2011 }
2012 }
2013 }
2014
2015
2016 /*
2017 * ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated.
2018 * It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the
2019 * check.
2020 */
2021 bool
ShouldInitiateMetadataSync(bool * lockFailure)2022 ShouldInitiateMetadataSync(bool *lockFailure)
2023 {
2024 if (!IsCoordinator())
2025 {
2026 *lockFailure = false;
2027 return false;
2028 }
2029
2030 Oid distNodeOid = DistNodeRelationId();
2031 if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock))
2032 {
2033 *lockFailure = true;
2034 return false;
2035 }
2036
2037 bool shouldSyncMetadata = false;
2038
2039 List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
2040 WorkerNode *workerNode = NULL;
2041 foreach_ptr(workerNode, workerList)
2042 {
2043 if (workerNode->hasMetadata && !workerNode->metadataSynced)
2044 {
2045 shouldSyncMetadata = true;
2046 break;
2047 }
2048 }
2049
2050 UnlockRelationOid(distNodeOid, AccessShareLock);
2051
2052 *lockFailure = false;
2053 return shouldSyncMetadata;
2054 }
2055
2056
2057 /*
2058 * citus_internal_add_partition_metadata is an internal UDF to
2059 * add a row to pg_dist_partition.
2060 */
2061 Datum
citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)2062 citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
2063 {
2064 CheckCitusVersion(ERROR);
2065
2066 PG_ENSURE_ARGNOTNULL(0, "relation");
2067 Oid relationId = PG_GETARG_OID(0);
2068
2069 PG_ENSURE_ARGNOTNULL(1, "distribution method");
2070 char distributionMethod = PG_GETARG_CHAR(1);
2071
2072 PG_ENSURE_ARGNOTNULL(3, "Colocation ID");
2073 int colocationId = PG_GETARG_INT32(3);
2074
2075 PG_ENSURE_ARGNOTNULL(4, "replication model");
2076 char replicationModel = PG_GETARG_CHAR(4);
2077
2078 text *distributionColumnText = NULL;
2079 char *distributionColumnString = NULL;
2080 Var *distributionColumnVar = NULL;
2081
2082 /* only owner of the table (or superuser) is allowed to add the Citus metadata */
2083 EnsureTableOwner(relationId);
2084
2085 /* we want to serialize all the metadata changes to this table */
2086 LockRelationOid(relationId, ShareUpdateExclusiveLock);
2087
2088 if (!PG_ARGISNULL(2))
2089 {
2090 distributionColumnText = PG_GETARG_TEXT_P(2);
2091 distributionColumnString = text_to_cstring(distributionColumnText);
2092
2093 Relation relation = relation_open(relationId, AccessShareLock);
2094 distributionColumnVar =
2095 BuildDistributionKeyFromColumnName(relation, distributionColumnString);
2096 Assert(distributionColumnVar != NULL);
2097
2098 relation_close(relation, NoLock);
2099 }
2100
2101 if (!ShouldSkipMetadataChecks())
2102 {
2103 /* this UDF is not allowed allowed for executing as a separate command */
2104 EnsureCoordinatorInitiatedOperation();
2105
2106 if (distributionMethod == DISTRIBUTE_BY_NONE && distributionColumnVar != NULL)
2107 {
2108 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2109 errmsg("Reference or local tables cannot have "
2110 "distribution columns")));
2111 }
2112 else if (distributionMethod != DISTRIBUTE_BY_NONE &&
2113 distributionColumnVar == NULL)
2114 {
2115 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2116 errmsg("Distribution column cannot be NULL for "
2117 "relation \"%s\"", get_rel_name(relationId))));
2118 }
2119
2120 /*
2121 * Even if the table owner is a malicious user and the partition
2122 * metadata is not sane, the user can only affect its own tables.
2123 * Given that the user is owner of the table, we should allow.
2124 */
2125 EnsurePartitionMetadataIsSane(relationId, distributionMethod, colocationId,
2126 replicationModel, distributionColumnVar);
2127 }
2128
2129 InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar,
2130 colocationId, replicationModel);
2131
2132 PG_RETURN_VOID();
2133 }
2134
2135
2136 /*
2137 * EnsurePartitionMetadataIsSane ensures that the input values are safe
2138 * for inserting into pg_dist_partition metadata.
2139 */
2140 static void
EnsurePartitionMetadataIsSane(Oid relationId,char distributionMethod,int colocationId,char replicationModel,Var * distributionColumnVar)2141 EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int colocationId,
2142 char replicationModel, Var *distributionColumnVar)
2143 {
2144 if (!(distributionMethod == DISTRIBUTE_BY_HASH ||
2145 distributionMethod == DISTRIBUTE_BY_NONE))
2146 {
2147 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2148 errmsg("Metadata syncing is only allowed for hash, reference "
2149 "and local tables:%c", distributionMethod)));
2150 }
2151
2152 if (colocationId < INVALID_COLOCATION_ID)
2153 {
2154 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2155 errmsg("Metadata syncing is only allowed for valid "
2156 "colocation id values.")));
2157 }
2158 else if (colocationId != INVALID_COLOCATION_ID &&
2159 distributionMethod == DISTRIBUTE_BY_HASH)
2160 {
2161 int count = 1;
2162 List *targetColocatedTableList =
2163 ColocationGroupTableList(colocationId, count);
2164
2165 /*
2166 * If we have any colocated hash tables, ensure if they share the
2167 * same distribution key properties.
2168 */
2169 if (list_length(targetColocatedTableList) >= 1)
2170 {
2171 Oid targetRelationId = linitial_oid(targetColocatedTableList);
2172
2173 EnsureColumnTypeEquality(relationId, targetRelationId, distributionColumnVar,
2174 DistPartitionKeyOrError(targetRelationId));
2175 }
2176 }
2177
2178
2179 if (!(replicationModel == REPLICATION_MODEL_2PC ||
2180 replicationModel == REPLICATION_MODEL_STREAMING ||
2181 replicationModel == REPLICATION_MODEL_COORDINATOR))
2182 {
2183 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2184 errmsg("Metadata syncing is only allowed for "
2185 "known replication models.")));
2186 }
2187
2188 if (distributionMethod == DISTRIBUTE_BY_HASH &&
2189 replicationModel != REPLICATION_MODEL_STREAMING)
2190 {
2191 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2192 errmsg("Hash distributed tables can only have '%c' "
2193 "as the replication model.",
2194 REPLICATION_MODEL_STREAMING)));
2195 }
2196
2197 if (distributionMethod == DISTRIBUTE_BY_NONE &&
2198 !(replicationModel == REPLICATION_MODEL_STREAMING ||
2199 replicationModel == REPLICATION_MODEL_2PC))
2200 {
2201 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2202 errmsg("Local or references tables can only have '%c' or '%c' "
2203 "as the replication model.",
2204 REPLICATION_MODEL_STREAMING, REPLICATION_MODEL_2PC)));
2205 }
2206 }
2207
2208
2209 /*
2210 * citus_internal_add_shard_metadata is an internal UDF to
2211 * add a row to pg_dist_shard.
2212 */
2213 Datum
citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)2214 citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
2215 {
2216 CheckCitusVersion(ERROR);
2217
2218 PG_ENSURE_ARGNOTNULL(0, "relation");
2219 Oid relationId = PG_GETARG_OID(0);
2220
2221 PG_ENSURE_ARGNOTNULL(1, "shard id");
2222 int64 shardId = PG_GETARG_INT64(1);
2223
2224 PG_ENSURE_ARGNOTNULL(2, "storage type");
2225 char storageType = PG_GETARG_CHAR(2);
2226
2227 text *shardMinValue = NULL;
2228 if (!PG_ARGISNULL(3))
2229 {
2230 shardMinValue = PG_GETARG_TEXT_P(3);
2231 }
2232
2233 text *shardMaxValue = NULL;
2234 if (!PG_ARGISNULL(4))
2235 {
2236 shardMaxValue = PG_GETARG_TEXT_P(4);
2237 }
2238
2239 /* only owner of the table (or superuser) is allowed to add the Citus metadata */
2240 EnsureTableOwner(relationId);
2241
2242 /* we want to serialize all the metadata changes to this table */
2243 LockRelationOid(relationId, ShareUpdateExclusiveLock);
2244
2245 if (!ShouldSkipMetadataChecks())
2246 {
2247 /* this UDF is not allowed allowed for executing as a separate command */
2248 EnsureCoordinatorInitiatedOperation();
2249
2250 /*
2251 * Even if the table owner is a malicious user and the shard metadata is
2252 * not sane, the user can only affect its own tables. Given that the
2253 * user is owner of the table, we should allow.
2254 */
2255 EnsureShardMetadataIsSane(relationId, shardId, storageType, shardMinValue,
2256 shardMaxValue);
2257 }
2258
2259 InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
2260
2261 PG_RETURN_VOID();
2262 }
2263
2264
2265 /*
2266 * EnsureCoordinatorInitiatedOperation is a helper function which ensures that
2267 * the execution is initiated by the coordinator on a worker node.
2268 */
2269 static void
EnsureCoordinatorInitiatedOperation(void)2270 EnsureCoordinatorInitiatedOperation(void)
2271 {
2272 /*
2273 * We are restricting the operation to only MX workers with the local group id
2274 * check. The other two checks are to ensure that the operation is initiated
2275 * by the coordinator.
2276 */
2277 if (!IsCitusInitiatedRemoteBackend() || !MyBackendIsInDisributedTransaction() ||
2278 GetLocalGroupId() == COORDINATOR_GROUP_ID)
2279 {
2280 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2281 errmsg("This is an internal Citus function can only be "
2282 "used in a distributed transaction")));
2283 }
2284 }
2285
2286
2287 /*
2288 * EnsureShardMetadataIsSane ensures that the input values are safe
2289 * for inserting into pg_dist_shard metadata.
2290 */
2291 static void
EnsureShardMetadataIsSane(Oid relationId,int64 shardId,char storageType,text * shardMinValue,text * shardMaxValue)2292 EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
2293 text *shardMinValue, text *shardMaxValue)
2294 {
2295 if (shardId <= INVALID_SHARD_ID)
2296 {
2297 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2298 errmsg("Invalid shard id: %ld", shardId)));
2299 }
2300
2301 if (!(storageType == SHARD_STORAGE_TABLE ||
2302 storageType == SHARD_STORAGE_FOREIGN ||
2303 storageType == SHARD_STORAGE_COLUMNAR))
2304 {
2305 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2306 errmsg("Invalid shard storage type: %c", storageType)));
2307 }
2308
2309 char partitionMethod = PartitionMethodViaCatalog(relationId);
2310 if (partitionMethod == DISTRIBUTE_BY_INVALID)
2311 {
2312 /* connection from the coordinator operating on a shard */
2313 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2314 errmsg("The relation \"%s\" does not have a valid "
2315 "entry in pg_dist_partition.",
2316 get_rel_name(relationId))));
2317 }
2318 else if (!(partitionMethod == DISTRIBUTE_BY_HASH ||
2319 partitionMethod == DISTRIBUTE_BY_NONE))
2320 {
2321 /* connection from the coordinator operating on a shard */
2322 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2323 errmsg("Metadata syncing is only allowed for hash, "
2324 "reference and local tables: %c", partitionMethod)));
2325 }
2326
2327 List *distShardTupleList = LookupDistShardTuples(relationId);
2328 if (partitionMethod == DISTRIBUTE_BY_NONE)
2329 {
2330 if (shardMinValue != NULL || shardMaxValue != NULL)
2331 {
2332 char *relationName = get_rel_name(relationId);
2333 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2334 errmsg("Shards of reference or local table \"%s\" should "
2335 "have NULL shard ranges", relationName)));
2336 }
2337 else if (list_length(distShardTupleList) != 0)
2338 {
2339 char *relationName = get_rel_name(relationId);
2340 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2341 errmsg("relation \"%s\" has already at least one shard, "
2342 "adding more is not allowed", relationName)));
2343 }
2344 }
2345 else if (partitionMethod == DISTRIBUTE_BY_HASH)
2346 {
2347 if (shardMinValue == NULL || shardMaxValue == NULL)
2348 {
2349 char *relationName = get_rel_name(relationId);
2350 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2351 errmsg("Shards of has distributed table \"%s\" "
2352 "cannot have NULL shard ranges", relationName)));
2353 }
2354
2355 char *shardMinValueString = text_to_cstring(shardMinValue);
2356 char *shardMaxValueString = text_to_cstring(shardMaxValue);
2357
2358 /* pg_strtoint32 does the syntax and out of bound checks for us */
2359 int32 shardMinValueInt = pg_strtoint32(shardMinValueString);
2360 int32 shardMaxValueInt = pg_strtoint32(shardMaxValueString);
2361
2362 if (shardMinValueInt > shardMaxValueInt)
2363 {
2364 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2365 errmsg("shardMinValue=%d is greater than "
2366 "shardMaxValue=%d for table \"%s\", which is "
2367 "not allowed", shardMinValueInt,
2368 shardMaxValueInt, get_rel_name(relationId))));
2369 }
2370
2371 /*
2372 * We are only dealing with hash distributed tables, that's why we
2373 * can hard code data type and typemod.
2374 */
2375 const int intervalTypeId = INT4OID;
2376 const int intervalTypeMod = -1;
2377
2378 Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
2379 TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
2380
2381 FmgrInfo *shardIntervalCompareFunction =
2382 GetFunctionInfo(intervalTypeId, BTREE_AM_OID, BTORDER_PROC);
2383
2384 HeapTuple shardTuple = NULL;
2385 foreach_ptr(shardTuple, distShardTupleList)
2386 {
2387 ShardInterval *shardInterval =
2388 TupleToShardInterval(shardTuple, distShardTupleDesc,
2389 intervalTypeId, intervalTypeMod);
2390
2391 Datum firstMin = Int32GetDatum(shardMinValueInt);
2392 Datum firstMax = Int32GetDatum(shardMaxValueInt);
2393 Datum secondMin = shardInterval->minValue;
2394 Datum secondMax = shardInterval->maxValue;
2395 Oid collationId = InvalidOid;
2396
2397 /*
2398 * This is an unexpected case as we are reading the metadata, which has
2399 * already been verified for being not NULL. Still, lets be extra
2400 * cautious to avoid any crashes.
2401 */
2402 if (!shardInterval->minValueExists || !shardInterval->maxValueExists)
2403 {
2404 char *relationName = get_rel_name(relationId);
2405 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2406 errmsg("Shards of has distributed table \"%s\" "
2407 "cannot have NULL shard ranges", relationName)));
2408 }
2409
2410 if (ShardIntervalsOverlapWithParams(firstMin, firstMax, secondMin, secondMax,
2411 shardIntervalCompareFunction,
2412 collationId))
2413 {
2414 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2415 errmsg("Shard intervals overlap for table \"%s\": "
2416 "%ld and %ld", get_rel_name(relationId),
2417 shardId, shardInterval->shardId)));
2418 }
2419 }
2420
2421 table_close(distShardRelation, NoLock);
2422 }
2423 }
2424
2425
2426 /*
2427 * citus_internal_add_placement_metadata is an internal UDF to
2428 * add a row to pg_dist_placement.
2429 */
2430 Datum
citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)2431 citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
2432 {
2433 CheckCitusVersion(ERROR);
2434
2435 int64 shardId = PG_GETARG_INT64(0);
2436 int32 shardState = PG_GETARG_INT32(1);
2437 int64 shardLength = PG_GETARG_INT64(2);
2438 int32 groupId = PG_GETARG_INT32(3);
2439 int64 placementId = PG_GETARG_INT64(4);
2440
2441 bool missingOk = false;
2442 Oid relationId = LookupShardRelationFromCatalog(shardId, missingOk);
2443
2444 /* only owner of the table is allowed to modify the metadata */
2445 EnsureTableOwner(relationId);
2446
2447 /* we want to serialize all the metadata changes to this table */
2448 LockRelationOid(relationId, ShareUpdateExclusiveLock);
2449
2450 if (!ShouldSkipMetadataChecks())
2451 {
2452 /* this UDF is not allowed allowed for executing as a separate command */
2453 EnsureCoordinatorInitiatedOperation();
2454
2455 /*
2456 * Even if the table owner is a malicious user, as long as the shard placements
2457 * fit into basic requirements of Citus metadata, the user can only affect its
2458 * own tables. Given that the user is owner of the table, we should allow.
2459 */
2460 EnsureShardPlacementMetadataIsSane(relationId, shardId, placementId, shardState,
2461 shardLength, groupId);
2462 }
2463
2464 InsertShardPlacementRow(shardId, placementId, shardState, shardLength, groupId);
2465
2466 PG_RETURN_VOID();
2467 }
2468
2469
2470 /*
2471 * EnsureShardPlacementMetadataIsSane ensures if the input parameters for
2472 * the shard placement metadata is sane.
2473 */
2474 static void
EnsureShardPlacementMetadataIsSane(Oid relationId,int64 shardId,int64 placementId,int32 shardState,int64 shardLength,int32 groupId)2475 EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placementId,
2476 int32 shardState, int64 shardLength, int32 groupId)
2477 {
2478 /* we have just read the metadata, so we are sure that the shard exists */
2479 Assert(ShardExists(shardId));
2480
2481 if (placementId <= INVALID_PLACEMENT_ID)
2482 {
2483 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2484 errmsg("Shard placement has invalid placement id "
2485 "(%ld) for shard(%ld)", placementId, shardId)));
2486 }
2487
2488 if (shardState != SHARD_STATE_ACTIVE)
2489 {
2490 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2491 errmsg("Invalid shard state: %d", shardState)));
2492 }
2493
2494 bool nodeIsInMetadata = false;
2495 WorkerNode *workerNode =
2496 PrimaryNodeForGroup(groupId, &nodeIsInMetadata);
2497 if (!workerNode)
2498 {
2499 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2500 errmsg("Node with group id %d for shard placement "
2501 "%ld does not exist", groupId, shardId)));
2502 }
2503 }
2504
2505
2506 /*
2507 * ShouldSkipMetadataChecks returns true if the current user is allowed to
2508 * make any
2509 */
2510 static bool
ShouldSkipMetadataChecks(void)2511 ShouldSkipMetadataChecks(void)
2512 {
2513 if (strcmp(EnableManualMetadataChangesForUser, "") != 0)
2514 {
2515 /*
2516 * EnableManualMetadataChangesForUser is a GUC which
2517 * can be changed by a super user. We use this GUC as
2518 * a safety belt in case the current metadata checks are
2519 * too restrictive and the operator can allow users to skip
2520 * the checks.
2521 */
2522
2523 /*
2524 * Make sure that the user exists, and print it to prevent any
2525 * optimization skipping the get_role_oid call.
2526 */
2527 bool missingOK = false;
2528 Oid allowedUserId = get_role_oid(EnableManualMetadataChangesForUser, missingOK);
2529 if (allowedUserId == GetUserId())
2530 {
2531 return true;
2532 }
2533 }
2534
2535 return false;
2536 }
2537
2538
2539 /*
2540 * citus_internal_update_placement_metadata is an internal UDF to
2541 * update a row in pg_dist_placement.
2542 */
2543 Datum
citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)2544 citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)
2545 {
2546 CheckCitusVersion(ERROR);
2547
2548 int64 shardId = PG_GETARG_INT64(0);
2549 int32 sourceGroupId = PG_GETARG_INT32(1);
2550 int32 targetGroupId = PG_GETARG_INT32(2);
2551
2552 ShardPlacement *placement = NULL;
2553 if (!ShouldSkipMetadataChecks())
2554 {
2555 /* this UDF is not allowed allowed for executing as a separate command */
2556 EnsureCoordinatorInitiatedOperation();
2557
2558 if (!ShardExists(shardId))
2559 {
2560 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2561 errmsg("Shard id does not exists: %ld", shardId)));
2562 }
2563
2564 bool missingOk = false;
2565 EnsureShardOwner(shardId, missingOk);
2566
2567 /*
2568 * This function ensures that the source group exists hence we
2569 * call it from this code-block.
2570 */
2571 placement = ActiveShardPlacementOnGroup(sourceGroupId, shardId);
2572
2573 bool nodeIsInMetadata = false;
2574 WorkerNode *workerNode =
2575 PrimaryNodeForGroup(targetGroupId, &nodeIsInMetadata);
2576 if (!workerNode)
2577 {
2578 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2579 errmsg("Node with group id %d for shard placement "
2580 "%ld does not exist", targetGroupId, shardId)));
2581 }
2582 }
2583 else
2584 {
2585 placement = ActiveShardPlacementOnGroup(sourceGroupId, shardId);
2586 }
2587
2588 /*
2589 * Updating pg_dist_placement ensures that the node with targetGroupId
2590 * exists and this is the only placement on that group.
2591 */
2592 if (placement == NULL)
2593 {
2594 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2595 errmsg("Active placement for shard %ld is not "
2596 "found on group:%d", shardId, targetGroupId)));
2597 }
2598
2599 UpdatePlacementGroupId(placement->placementId, targetGroupId);
2600
2601 PG_RETURN_VOID();
2602 }
2603
2604
2605 /*
2606 * citus_internal_delete_shard_metadata is an internal UDF to
2607 * delete a row in pg_dist_shard and corresponding placement rows
2608 * from pg_dist_shard_placement.
2609 */
2610 Datum
citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS)2611 citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS)
2612 {
2613 CheckCitusVersion(ERROR);
2614
2615 int64 shardId = PG_GETARG_INT64(0);
2616
2617 if (!ShouldSkipMetadataChecks())
2618 {
2619 /* this UDF is not allowed allowed for executing as a separate command */
2620 EnsureCoordinatorInitiatedOperation();
2621
2622 if (!ShardExists(shardId))
2623 {
2624 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2625 errmsg("Shard id does not exists: %ld", shardId)));
2626 }
2627
2628 bool missingOk = false;
2629 EnsureShardOwner(shardId, missingOk);
2630 }
2631
2632 List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
2633 ShardPlacement *shardPlacement = NULL;
2634 foreach_ptr(shardPlacement, shardPlacementList)
2635 {
2636 DeleteShardPlacementRow(shardPlacement->placementId);
2637 }
2638
2639 DeleteShardRow(shardId);
2640
2641 PG_RETURN_VOID();
2642 }
2643
2644
2645 /*
2646 * citus_internal_update_relation_colocation is an internal UDF to
2647 * delete a row in pg_dist_shard and corresponding placement rows
2648 * from pg_dist_shard_placement.
2649 */
2650 Datum
citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)2651 citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
2652 {
2653 CheckCitusVersion(ERROR);
2654
2655 Oid relationId = PG_GETARG_OID(0);
2656 uint32 tagetColocationId = PG_GETARG_UINT32(1);
2657
2658 EnsureTableOwner(relationId);
2659
2660 if (!ShouldSkipMetadataChecks())
2661 {
2662 /* this UDF is not allowed allowed for executing as a separate command */
2663 EnsureCoordinatorInitiatedOperation();
2664
2665 /* ensure that the table is in pg_dist_partition */
2666 char partitionMethod = PartitionMethodViaCatalog(relationId);
2667 if (partitionMethod == DISTRIBUTE_BY_INVALID)
2668 {
2669 /* connection from the coordinator operating on a shard */
2670 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2671 errmsg("The relation \"%s\" does not have a valid "
2672 "entry in pg_dist_partition.",
2673 get_rel_name(relationId))));
2674 }
2675 else if (partitionMethod != DISTRIBUTE_BY_HASH)
2676 {
2677 /* connection from the coordinator operating on a shard */
2678 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2679 errmsg("Updating colocation ids are only allowed for hash "
2680 "distributed tables: %c", partitionMethod)));
2681 }
2682
2683 int count = 1;
2684 List *targetColocatedTableList =
2685 ColocationGroupTableList(tagetColocationId, count);
2686
2687 if (list_length(targetColocatedTableList) == 0)
2688 {
2689 /* the table is colocated with none, so nothing to check */
2690 }
2691 else
2692 {
2693 Oid targetRelationId = linitial_oid(targetColocatedTableList);
2694
2695 ErrorIfShardPlacementsNotColocated(relationId, targetRelationId);
2696 CheckReplicationModel(relationId, targetRelationId);
2697 CheckDistributionColumnType(relationId, targetRelationId);
2698 }
2699 }
2700
2701 bool localOnly = true;
2702 UpdateRelationColocationGroup(relationId, tagetColocationId, localOnly);
2703
2704 PG_RETURN_VOID();
2705 }
2706