1 /*
2 * node_metadata.c
3 * Functions that operate on pg_dist_node
4 *
5 * Copyright (c) Citus Data, Inc.
6 */
7 #include "postgres.h"
8 #include "miscadmin.h"
9 #include "funcapi.h"
10 #include "utils/plancache.h"
11
12
13 #include "access/genam.h"
14 #include "access/heapam.h"
15 #include "access/htup.h"
16 #include "access/htup_details.h"
17 #include "access/skey.h"
18 #include "access/skey.h"
19 #include "access/tupmacs.h"
20 #include "access/xact.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "commands/sequence.h"
24 #include "distributed/citus_acquire_lock.h"
25 #include "distributed/citus_safe_lib.h"
26 #include "distributed/colocation_utils.h"
27 #include "distributed/commands.h"
28 #include "distributed/commands/utility_hook.h"
29 #include "distributed/connection_management.h"
30 #include "distributed/maintenanced.h"
31 #include "distributed/coordinator_protocol.h"
32 #include "distributed/metadata_utility.h"
33 #include "distributed/metadata/distobject.h"
34 #include "distributed/metadata_cache.h"
35 #include "distributed/metadata_sync.h"
36 #include "distributed/multi_join_order.h"
37 #include "distributed/multi_router_planner.h"
38 #include "distributed/pg_dist_node.h"
39 #include "distributed/reference_table_utils.h"
40 #include "distributed/remote_commands.h"
41 #include "distributed/resource_lock.h"
42 #include "distributed/shardinterval_utils.h"
43 #include "distributed/shared_connection_stats.h"
44 #include "distributed/string_utils.h"
45 #include "distributed/transaction_recovery.h"
46 #include "distributed/version_compat.h"
47 #include "distributed/worker_manager.h"
48 #include "distributed/worker_transaction.h"
49 #include "lib/stringinfo.h"
50 #include "postmaster/postmaster.h"
51 #include "storage/bufmgr.h"
52 #include "storage/lmgr.h"
53 #include "storage/lock.h"
54 #include "storage/fd.h"
55 #include "utils/builtins.h"
56 #include "utils/fmgroids.h"
57 #include "utils/lsyscache.h"
58 #include "utils/rel.h"
59 #include "utils/relcache.h"
60
61 #define INVALID_GROUP_ID -1
62
63 /* default group size */
64 int GroupSize = 1;
65
66 /* config variable managed via guc.c */
67 char *CurrentCluster = "default";
68
69 /*
70 * Config variable to control whether we should replicate reference tables on
71 * node activation or we should defer it to shard creation.
72 */
73 bool ReplicateReferenceTablesOnActivate = true;
74
75 /* did current transaction modify pg_dist_node? */
76 bool TransactionModifiedNodeMetadata = false;
77
78 typedef struct NodeMetadata
79 {
80 int32 groupId;
81 char *nodeRack;
82 bool hasMetadata;
83 bool metadataSynced;
84 bool isActive;
85 Oid nodeRole;
86 bool shouldHaveShards;
87 char *nodeCluster;
88 } NodeMetadata;
89
90 /* local function forward declarations */
91 static int ActivateNode(char *nodeName, int nodePort);
92 static bool CanRemoveReferenceTablePlacements(void);
93 static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
94 static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
95 *nodeMetadata, bool *nodeAlreadyExists);
96 static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
97 static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
98 static int32 GetNextGroupId(void);
99 static int GetNextNodeId(void);
100 static void InsertPlaceholderCoordinatorRecord(void);
101 static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
102 *nodeMetadata);
103 static void DeleteNodeRow(char *nodename, int32 nodeport);
104 static void SetUpDistributedTableDependencies(WorkerNode *workerNode);
105 static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
106 static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
107 static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
108 static bool NodeIsLocal(WorkerNode *worker);
109 static void SetLockTimeoutLocally(int32 lock_cooldown);
110 static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
111 static bool UnsetMetadataSyncedForAll(void);
112 static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
113 int columnIndex,
114 Datum value);
115 static char * NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata);
116 static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced);
117 static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
118 char *field);
119 static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
120 static void RemoveOldShardPlacementForNodeGroup(int groupId);
121
122 /* declarations for dynamic loading */
123 PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
124 PG_FUNCTION_INFO_V1(citus_add_node);
125 PG_FUNCTION_INFO_V1(master_add_node);
126 PG_FUNCTION_INFO_V1(citus_add_inactive_node);
127 PG_FUNCTION_INFO_V1(master_add_inactive_node);
128 PG_FUNCTION_INFO_V1(citus_add_secondary_node);
129 PG_FUNCTION_INFO_V1(master_add_secondary_node);
130 PG_FUNCTION_INFO_V1(citus_set_node_property);
131 PG_FUNCTION_INFO_V1(master_set_node_property);
132 PG_FUNCTION_INFO_V1(citus_remove_node);
133 PG_FUNCTION_INFO_V1(master_remove_node);
134 PG_FUNCTION_INFO_V1(citus_disable_node);
135 PG_FUNCTION_INFO_V1(master_disable_node);
136 PG_FUNCTION_INFO_V1(citus_activate_node);
137 PG_FUNCTION_INFO_V1(master_activate_node);
138 PG_FUNCTION_INFO_V1(citus_update_node);
139 PG_FUNCTION_INFO_V1(master_update_node);
140 PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
141
142
143 /*
144 * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
145 * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
146 */
147 static NodeMetadata
DefaultNodeMetadata()148 DefaultNodeMetadata()
149 {
150 NodeMetadata nodeMetadata;
151
152 /* ensure uninitialized padding doesn't escape the function */
153 memset_struct_0(nodeMetadata);
154 nodeMetadata.nodeRack = WORKER_DEFAULT_RACK;
155 nodeMetadata.shouldHaveShards = true;
156 nodeMetadata.groupId = INVALID_GROUP_ID;
157
158 return nodeMetadata;
159 }
160
161
162 /*
163 * citus_set_coordinator_host configures the hostname and port through which worker
164 * nodes can connect to the coordinator.
165 */
166 Datum
citus_set_coordinator_host(PG_FUNCTION_ARGS)167 citus_set_coordinator_host(PG_FUNCTION_ARGS)
168 {
169 CheckCitusVersion(ERROR);
170
171 text *nodeName = PG_GETARG_TEXT_P(0);
172 int32 nodePort = PG_GETARG_INT32(1);
173 char *nodeNameString = text_to_cstring(nodeName);
174
175 NodeMetadata nodeMetadata = DefaultNodeMetadata();
176 nodeMetadata.groupId = 0;
177 nodeMetadata.shouldHaveShards = false;
178 nodeMetadata.nodeRole = PG_GETARG_OID(2);
179
180 Name nodeClusterName = PG_GETARG_NAME(3);
181 nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
182
183 /* prevent concurrent modification */
184 LockRelationOid(DistNodeRelationId(), RowShareLock);
185
186 bool isCoordinatorInMetadata = false;
187 WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
188 &isCoordinatorInMetadata);
189 if (!isCoordinatorInMetadata)
190 {
191 bool nodeAlreadyExists = false;
192
193 /* add the coordinator to pg_dist_node if it was not already added */
194 AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
195 &nodeAlreadyExists);
196
197 /* we just checked */
198 Assert(!nodeAlreadyExists);
199 }
200 else
201 {
202 /*
203 * since AddNodeMetadata takes an exclusive lock on pg_dist_node, we
204 * do not need to worry about concurrent changes (e.g. deletion) and
205 * can proceed to update immediately.
206 */
207
208 UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort);
209
210 /* clear cached plans that have the old host/port */
211 ResetPlanCache();
212 }
213
214 TransactionModifiedNodeMetadata = true;
215
216 PG_RETURN_VOID();
217 }
218
219
220 /*
221 * citus_add_node function adds a new node to the cluster and returns its id. It also
222 * replicates all reference tables to the new node.
223 */
224 Datum
citus_add_node(PG_FUNCTION_ARGS)225 citus_add_node(PG_FUNCTION_ARGS)
226 {
227 CheckCitusVersion(ERROR);
228
229 text *nodeName = PG_GETARG_TEXT_P(0);
230 int32 nodePort = PG_GETARG_INT32(1);
231 char *nodeNameString = text_to_cstring(nodeName);
232
233 NodeMetadata nodeMetadata = DefaultNodeMetadata();
234 bool nodeAlreadyExists = false;
235 nodeMetadata.groupId = PG_GETARG_INT32(2);
236
237 /*
238 * During tests this function is called before nodeRole and nodeCluster have been
239 * created.
240 */
241 if (PG_NARGS() == 3)
242 {
243 nodeMetadata.nodeRole = InvalidOid;
244 nodeMetadata.nodeCluster = "default";
245 }
246 else
247 {
248 Name nodeClusterName = PG_GETARG_NAME(4);
249 nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
250
251 nodeMetadata.nodeRole = PG_GETARG_OID(3);
252 }
253
254 if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
255 {
256 /* by default, we add the coordinator without shards */
257 nodeMetadata.shouldHaveShards = false;
258 }
259
260 int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
261 &nodeAlreadyExists);
262 TransactionModifiedNodeMetadata = true;
263
264 /*
265 * After adding new node, if the node did not already exist, we will activate
266 * the node. This means we will replicate all reference tables to the new
267 * node.
268 */
269 if (!nodeAlreadyExists)
270 {
271 ActivateNode(nodeNameString, nodePort);
272 }
273
274 PG_RETURN_INT32(nodeId);
275 }
276
277
278 /*
279 * master_add_node is a wrapper function for old UDF name.
280 */
281 Datum
master_add_node(PG_FUNCTION_ARGS)282 master_add_node(PG_FUNCTION_ARGS)
283 {
284 return citus_add_node(fcinfo);
285 }
286
287
288 /*
289 * citus_add_inactive_node function adds a new node to the cluster as inactive node
290 * and returns id of the newly added node. It does not replicate reference
291 * tables to the new node, it only adds new node to the pg_dist_node table.
292 */
293 Datum
citus_add_inactive_node(PG_FUNCTION_ARGS)294 citus_add_inactive_node(PG_FUNCTION_ARGS)
295 {
296 CheckCitusVersion(ERROR);
297
298 text *nodeName = PG_GETARG_TEXT_P(0);
299 int32 nodePort = PG_GETARG_INT32(1);
300 char *nodeNameString = text_to_cstring(nodeName);
301 Name nodeClusterName = PG_GETARG_NAME(4);
302
303 NodeMetadata nodeMetadata = DefaultNodeMetadata();
304 bool nodeAlreadyExists = false;
305 nodeMetadata.groupId = PG_GETARG_INT32(2);
306 nodeMetadata.nodeRole = PG_GETARG_OID(3);
307 nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
308
309 if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
310 {
311 ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node")));
312 }
313
314 int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
315 &nodeAlreadyExists);
316 TransactionModifiedNodeMetadata = true;
317
318 PG_RETURN_INT32(nodeId);
319 }
320
321
322 /*
323 * master_add_inactive_node is a wrapper function for old UDF name.
324 */
325 Datum
master_add_inactive_node(PG_FUNCTION_ARGS)326 master_add_inactive_node(PG_FUNCTION_ARGS)
327 {
328 return citus_add_inactive_node(fcinfo);
329 }
330
331
332 /*
333 * citus_add_secondary_node adds a new secondary node to the cluster. It accepts as
334 * arguments the primary node it should share a group with.
335 */
336 Datum
citus_add_secondary_node(PG_FUNCTION_ARGS)337 citus_add_secondary_node(PG_FUNCTION_ARGS)
338 {
339 CheckCitusVersion(ERROR);
340
341 text *nodeName = PG_GETARG_TEXT_P(0);
342 int32 nodePort = PG_GETARG_INT32(1);
343 char *nodeNameString = text_to_cstring(nodeName);
344
345 text *primaryName = PG_GETARG_TEXT_P(2);
346 int32 primaryPort = PG_GETARG_INT32(3);
347 char *primaryNameString = text_to_cstring(primaryName);
348
349 Name nodeClusterName = PG_GETARG_NAME(4);
350 NodeMetadata nodeMetadata = DefaultNodeMetadata();
351 bool nodeAlreadyExists = false;
352
353 nodeMetadata.groupId = GroupForNode(primaryNameString, primaryPort);
354 nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
355 nodeMetadata.nodeRole = SecondaryNodeRoleId();
356 nodeMetadata.isActive = true;
357
358 int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
359 &nodeAlreadyExists);
360 TransactionModifiedNodeMetadata = true;
361
362 PG_RETURN_INT32(nodeId);
363 }
364
365
366 /*
367 * master_add_secondary_node is a wrapper function for old UDF name.
368 */
369 Datum
master_add_secondary_node(PG_FUNCTION_ARGS)370 master_add_secondary_node(PG_FUNCTION_ARGS)
371 {
372 return citus_add_secondary_node(fcinfo);
373 }
374
375
376 /*
377 * citus_remove_node function removes the provided node from the pg_dist_node table of
378 * the master node and all nodes with metadata.
379 * The call to the citus_remove_node should be done by the super user and the specified
380 * node should not have any active placements.
381 * This function also deletes all reference table placements belong to the given node from
382 * pg_dist_placement, but it does not drop actual placement at the node. In the case of
383 * re-adding the node, citus_add_node first drops and re-creates the reference tables.
384 */
385 Datum
citus_remove_node(PG_FUNCTION_ARGS)386 citus_remove_node(PG_FUNCTION_ARGS)
387 {
388 CheckCitusVersion(ERROR);
389
390 text *nodeNameText = PG_GETARG_TEXT_P(0);
391 int32 nodePort = PG_GETARG_INT32(1);
392
393 RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort);
394 TransactionModifiedNodeMetadata = true;
395
396 PG_RETURN_VOID();
397 }
398
399
400 /*
401 * master_remove_node is a wrapper function for old UDF name.
402 */
403 Datum
master_remove_node(PG_FUNCTION_ARGS)404 master_remove_node(PG_FUNCTION_ARGS)
405 {
406 return citus_remove_node(fcinfo);
407 }
408
409
410 /*
411 * citus_disable_node function sets isactive value of the provided node as inactive at
412 * coordinator and all nodes with metadata regardless of the node having an active shard
413 * placement.
414 *
415 * The call to the citus_disable_node must be done by the super user.
416 *
417 * This function also deletes all reference table placements belong to the given node
418 * from pg_dist_placement, but it does not drop actual placement at the node. In the case
419 * of re-activating the node, citus_add_node first drops and re-creates the reference
420 * tables.
421 */
422 Datum
citus_disable_node(PG_FUNCTION_ARGS)423 citus_disable_node(PG_FUNCTION_ARGS)
424 {
425 text *nodeNameText = PG_GETARG_TEXT_P(0);
426 int32 nodePort = PG_GETARG_INT32(1);
427 char *nodeName = text_to_cstring(nodeNameText);
428 WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
429 bool isActive = false;
430 bool onlyConsiderActivePlacements = false;
431 MemoryContext savedContext = CurrentMemoryContext;
432
433 PG_TRY();
434 {
435 if (NodeIsPrimary(workerNode))
436 {
437 /*
438 * Delete reference table placements so they are not taken into account
439 * for the check if there are placements after this.
440 */
441 DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
442
443 if (NodeGroupHasShardPlacements(workerNode->groupId,
444 onlyConsiderActivePlacements))
445 {
446 ereport(NOTICE, (errmsg(
447 "Node %s:%d has active shard placements. Some queries "
448 "may fail after this operation. Use "
449 "SELECT citus_activate_node('%s', %d) to activate this "
450 "node back.",
451 workerNode->workerName, nodePort,
452 workerNode->workerName,
453 nodePort)));
454 }
455 }
456
457 SetNodeState(nodeName, nodePort, isActive);
458 TransactionModifiedNodeMetadata = true;
459 }
460 PG_CATCH();
461 {
462 /* CopyErrorData() requires (CurrentMemoryContext != ErrorContext) */
463 MemoryContextSwitchTo(savedContext);
464 ErrorData *edata = CopyErrorData();
465
466 if (ClusterHasKnownMetadataWorkers())
467 {
468 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
469 errmsg("Disabling %s:%d failed", workerNode->workerName,
470 nodePort),
471 errdetail("%s", edata->message),
472 errhint(
473 "If you are using MX, try stop_metadata_sync_to_node(hostname, port) "
474 "for nodes that are down before disabling them.")));
475 }
476 else
477 {
478 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
479 errmsg("Disabling %s:%d failed", workerNode->workerName,
480 nodePort),
481 errdetail("%s", edata->message)));
482 }
483 }
484 PG_END_TRY();
485
486 PG_RETURN_VOID();
487 }
488
489
490 /*
491 * master_disable_node is a wrapper function for old UDF name.
492 */
493 Datum
master_disable_node(PG_FUNCTION_ARGS)494 master_disable_node(PG_FUNCTION_ARGS)
495 {
496 return citus_disable_node(fcinfo);
497 }
498
499
500 /*
501 * citus_set_node_property sets a property of the node
502 */
503 Datum
citus_set_node_property(PG_FUNCTION_ARGS)504 citus_set_node_property(PG_FUNCTION_ARGS)
505 {
506 text *nodeNameText = PG_GETARG_TEXT_P(0);
507 int32 nodePort = PG_GETARG_INT32(1);
508 text *propertyText = PG_GETARG_TEXT_P(2);
509 bool value = PG_GETARG_BOOL(3);
510
511 WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText),
512 nodePort);
513
514 if (strcmp(text_to_cstring(propertyText), "shouldhaveshards") == 0)
515 {
516 SetShouldHaveShards(workerNode, value);
517 }
518 else
519 {
520 ereport(ERROR, (errmsg(
521 "only the 'shouldhaveshards' property can be set using this function"
522 )));
523 }
524
525 TransactionModifiedNodeMetadata = true;
526
527 PG_RETURN_VOID();
528 }
529
530
531 /*
532 * master_set_node_property is a wrapper function for old UDF name.
533 */
534 Datum
master_set_node_property(PG_FUNCTION_ARGS)535 master_set_node_property(PG_FUNCTION_ARGS)
536 {
537 return citus_set_node_property(fcinfo);
538 }
539
540
541 /*
542 * SetUpDistributedTableDependencies sets up up the following on a node if it's
543 * a primary node that currently stores data:
544 * - All dependencies (e.g., types, schemas)
545 * - Reference tables, because they are needed to handle queries efficiently.
546 * - Distributed functions
547 *
548 * Note that we do not create the distributed dependencies on the coordinator
549 * since all the dependencies should be present in the coordinator already.
550 */
551 static void
SetUpDistributedTableDependencies(WorkerNode * newWorkerNode)552 SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
553 {
554 if (NodeIsPrimary(newWorkerNode))
555 {
556 EnsureNoModificationsHaveBeenDone();
557
558 if (ShouldPropagate() && !NodeIsCoordinator(newWorkerNode))
559 {
560 PropagateNodeWideObjects(newWorkerNode);
561 ReplicateAllDependenciesToNode(newWorkerNode->workerName,
562 newWorkerNode->workerPort);
563 }
564 else if (!NodeIsCoordinator(newWorkerNode))
565 {
566 ereport(WARNING, (errmsg("citus.enable_object_propagation is off, not "
567 "creating distributed objects on worker"),
568 errdetail("distributed objects are only kept in sync when "
569 "citus.enable_object_propagation is set to on. "
570 "Newly activated nodes will not get these "
571 "objects created")));
572 }
573
574 if (ReplicateReferenceTablesOnActivate)
575 {
576 ReplicateAllReferenceTablesToNode(newWorkerNode->workerName,
577 newWorkerNode->workerPort);
578 }
579
580 /*
581 * Let the maintenance daemon do the hard work of syncing the metadata.
582 * We prefer this because otherwise node activation might fail within
583 * transaction blocks.
584 */
585 if (ClusterHasDistributedFunctionWithDistArgument())
586 {
587 SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata,
588 BoolGetDatum(true));
589 TriggerMetadataSyncOnCommit();
590 }
591 }
592 }
593
594
595 /*
596 * PropagateNodeWideObjects is called during node activation to propagate any object that
597 * should be propagated for every node. These are generally not linked to any distributed
598 * object but change system wide behaviour.
599 */
600 static void
PropagateNodeWideObjects(WorkerNode * newWorkerNode)601 PropagateNodeWideObjects(WorkerNode *newWorkerNode)
602 {
603 /* collect all commands */
604 List *ddlCommands = NIL;
605
606 if (EnableAlterRoleSetPropagation)
607 {
608 /*
609 * Get commands for database and postgres wide settings. Since these settings are not
610 * linked to any role that can be distributed we need to distribute them seperately
611 */
612 List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid);
613 ddlCommands = list_concat(ddlCommands, alterRoleSetCommands);
614 }
615
616 if (list_length(ddlCommands) > 0)
617 {
618 /* if there are command wrap them in enable_ddl_propagation off */
619 ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
620 ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
621
622 /* send commands to new workers*/
623 SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName,
624 newWorkerNode->workerPort,
625 CitusExtensionOwnerName(),
626 ddlCommands);
627 }
628 }
629
630
631 /*
632 * ModifiableWorkerNode gets the requested WorkerNode and also gets locks
633 * required for modifying it. This fails if the node does not exist.
634 */
635 static WorkerNode *
ModifiableWorkerNode(const char * nodeName,int32 nodePort)636 ModifiableWorkerNode(const char *nodeName, int32 nodePort)
637 {
638 CheckCitusVersion(ERROR);
639 EnsureCoordinator();
640
641 /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
642 LockRelationOid(DistNodeRelationId(), ExclusiveLock);
643
644 WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
645 if (workerNode == NULL)
646 {
647 ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
648 }
649
650 return workerNode;
651 }
652
653
654 /*
655 * citus_activate_node UDF activates the given node. It sets the node's isactive
656 * value to active and replicates all reference tables to that node.
657 */
658 Datum
citus_activate_node(PG_FUNCTION_ARGS)659 citus_activate_node(PG_FUNCTION_ARGS)
660 {
661 text *nodeNameText = PG_GETARG_TEXT_P(0);
662 int32 nodePort = PG_GETARG_INT32(1);
663
664 WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText),
665 nodePort);
666 ActivateNode(workerNode->workerName, workerNode->workerPort);
667
668 TransactionModifiedNodeMetadata = true;
669
670 PG_RETURN_INT32(workerNode->nodeId);
671 }
672
673
674 /*
675 * master_activate_node is a wrapper function for old UDF name.
676 */
677 Datum
master_activate_node(PG_FUNCTION_ARGS)678 master_activate_node(PG_FUNCTION_ARGS)
679 {
680 return citus_activate_node(fcinfo);
681 }
682
683
684 /*
685 * GroupForNode returns the group which a given node belongs to.
686 *
687 * It only works if the requested node is a part of CurrentCluster.
688 */
689 uint32
GroupForNode(char * nodeName,int nodePort)690 GroupForNode(char *nodeName, int nodePort)
691 {
692 WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
693
694 if (workerNode == NULL)
695 {
696 ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
697 }
698
699 return workerNode->groupId;
700 }
701
702
703 /*
704 * NodeIsPrimaryAndLocal returns whether the argument represents the local
705 * primary node.
706 */
707 bool
NodeIsPrimaryAndRemote(WorkerNode * worker)708 NodeIsPrimaryAndRemote(WorkerNode *worker)
709 {
710 return NodeIsPrimary(worker) && !NodeIsLocal(worker);
711 }
712
713
714 /*
715 * NodeIsPrimary returns whether the argument represents a primary node.
716 */
717 bool
NodeIsPrimary(WorkerNode * worker)718 NodeIsPrimary(WorkerNode *worker)
719 {
720 Oid primaryRole = PrimaryNodeRoleId();
721
722 /* if nodeRole does not yet exist, all nodes are primary nodes */
723 if (primaryRole == InvalidOid)
724 {
725 return true;
726 }
727
728 return worker->nodeRole == primaryRole;
729 }
730
731
732 /*
733 * NodeIsLocal returns whether the argument represents the local node.
734 */
735 static bool
NodeIsLocal(WorkerNode * worker)736 NodeIsLocal(WorkerNode *worker)
737 {
738 return worker->groupId == GetLocalGroupId();
739 }
740
741
742 /*
743 * NodeIsSecondary returns whether the argument represents a secondary node.
744 */
745 bool
NodeIsSecondary(WorkerNode * worker)746 NodeIsSecondary(WorkerNode *worker)
747 {
748 Oid secondaryRole = SecondaryNodeRoleId();
749
750 /* if nodeRole does not yet exist, all nodes are primary nodes */
751 if (secondaryRole == InvalidOid)
752 {
753 return false;
754 }
755
756 return worker->nodeRole == secondaryRole;
757 }
758
759
760 /*
761 * NodeIsReadable returns whether we're allowed to send SELECT queries to this
762 * node.
763 */
764 bool
NodeIsReadable(WorkerNode * workerNode)765 NodeIsReadable(WorkerNode *workerNode)
766 {
767 if (ReadFromSecondaries == USE_SECONDARY_NODES_NEVER &&
768 NodeIsPrimary(workerNode))
769 {
770 return true;
771 }
772
773 if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS &&
774 NodeIsSecondary(workerNode))
775 {
776 return true;
777 }
778
779 return false;
780 }
781
782
783 /*
784 * PrimaryNodeForGroup returns the (unique) primary in the specified group.
785 *
786 * If there are any nodes in the requested group and groupContainsNodes is not NULL
787 * it will set the bool groupContainsNodes references to true.
788 */
789 WorkerNode *
PrimaryNodeForGroup(int32 groupId,bool * groupContainsNodes)790 PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
791 {
792 WorkerNode *workerNode = NULL;
793 HASH_SEQ_STATUS status;
794 HTAB *workerNodeHash = GetWorkerNodeHash();
795
796 hash_seq_init(&status, workerNodeHash);
797
798 while ((workerNode = hash_seq_search(&status)) != NULL)
799 {
800 int32 workerNodeGroupId = workerNode->groupId;
801 if (workerNodeGroupId != groupId)
802 {
803 continue;
804 }
805
806 if (groupContainsNodes != NULL)
807 {
808 *groupContainsNodes = true;
809 }
810
811 if (NodeIsPrimary(workerNode))
812 {
813 hash_seq_term(&status);
814 return workerNode;
815 }
816 }
817
818 return NULL;
819 }
820
821
822 /*
823 * ActivateNode activates the node with nodeName and nodePort. Currently, activation
824 * includes only replicating the reference tables and setting isactive column of the
825 * given node.
826 */
827 static int
ActivateNode(char * nodeName,int nodePort)828 ActivateNode(char *nodeName, int nodePort)
829 {
830 bool isActive = true;
831
832 /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
833 LockRelationOid(DistNodeRelationId(), ExclusiveLock);
834
835 WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
836
837 SetUpDistributedTableDependencies(newWorkerNode);
838 return newWorkerNode->nodeId;
839 }
840
841
842 /*
843 * citus_update_node moves the requested node to a different nodename and nodeport. It
844 * locks to ensure no queries are running concurrently; and is intended for customers who
845 * are running their own failover solution.
846 */
847 Datum
citus_update_node(PG_FUNCTION_ARGS)848 citus_update_node(PG_FUNCTION_ARGS)
849 {
850 CheckCitusVersion(ERROR);
851
852 int32 nodeId = PG_GETARG_INT32(0);
853
854 text *newNodeName = PG_GETARG_TEXT_P(1);
855 int32 newNodePort = PG_GETARG_INT32(2);
856
857 /*
858 * force is used when an update needs to happen regardless of conflicting locks. This
859 * feature is important to force the update during a failover due to failure, eg. by
860 * a high-availability system such as pg_auto_failover. The strategy is to start a
861 * background worker that actively cancels backends holding conflicting locks with
862 * this backend.
863 *
864 * Defaults to false
865 */
866 bool force = PG_GETARG_BOOL(3);
867 int32 lock_cooldown = PG_GETARG_INT32(4);
868
869 char *newNodeNameString = text_to_cstring(newNodeName);
870 List *placementList = NIL;
871 BackgroundWorkerHandle *handle = NULL;
872
873 WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
874 newNodePort);
875 if (workerNodeWithSameAddress != NULL)
876 {
877 /* a node with the given hostname and port already exists in the metadata */
878
879 if (workerNodeWithSameAddress->nodeId == nodeId)
880 {
881 /* it's the node itself, meaning this is a noop update */
882 PG_RETURN_VOID();
883 }
884 else
885 {
886 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
887 errmsg("there is already another node with the specified "
888 "hostname and port")));
889 }
890 }
891
892 WorkerNode *workerNode = LookupNodeByNodeId(nodeId);
893 if (workerNode == NULL)
894 {
895 ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
896 errmsg("node %u not found", nodeId)));
897 }
898
899
900 /*
901 * If the node is a primary node we block reads and writes.
902 *
903 * This lock has two purposes:
904 *
905 * - Ensure buggy code in Citus doesn't cause failures when the
906 * nodename/nodeport of a node changes mid-query
907 *
908 * - Provide fencing during failover, after this function returns all
909 * connections will use the new node location.
910 *
911 * Drawback:
912 *
913 * - This function blocks until all previous queries have finished. This
914 * means that long-running queries will prevent failover.
915 *
916 * In case of node failure said long-running queries will fail in the end
917 * anyway as they will be unable to commit successfully on the failed
918 * machine. To cause quick failure of these queries use force => true
919 * during the invocation of citus_update_node to terminate conflicting
920 * backends proactively.
921 *
922 * It might be worth blocking reads to a secondary for the same reasons,
923 * though we currently only query secondaries on follower clusters
924 * where these locks will have no effect.
925 */
926 if (NodeIsPrimary(workerNode))
927 {
928 /*
929 * before acquiring the locks check if we want a background worker to help us to
930 * aggressively obtain the locks.
931 */
932 if (force)
933 {
934 handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
935 if (!handle)
936 {
937 /*
938 * We failed to start a background worker, which probably means that we exceeded
939 * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
940 * to repeatedly throw an error because if citus_update_node is called to complete a
941 * failover then finishing is the only way to bring the cluster back up. Therefore we
942 * give up on killing other backends and simply wait for the lock. We do set
943 * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
944 */
945 SetLockTimeoutLocally(lock_cooldown);
946 ereport(WARNING, (errmsg(
947 "could not start background worker to kill backends with conflicting"
948 " locks to force the update. Degrading to acquiring locks "
949 "with a lock time out."),
950 errhint(
951 "Increasing max_worker_processes might help.")));
952 }
953 }
954
955 placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
956 LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
957 }
958
959 /*
960 * if we have planned statements such as prepared statements, we should clear the cache so that
961 * the planned cache doesn't return the old nodename/nodepost.
962 */
963 ResetPlanCache();
964
965 UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
966
967 /* we should be able to find the new node from the metadata */
968 workerNode = FindWorkerNode(newNodeNameString, newNodePort);
969 Assert(workerNode->nodeId == nodeId);
970
971 /*
972 * Propagate the updated pg_dist_node entry to all metadata workers.
973 * citus-ha uses citus_update_node() in a prepared transaction, and
974 * we don't support coordinated prepared transactions, so we cannot
975 * propagate the changes to the worker nodes here. Instead we mark
976 * all metadata nodes as not-synced and ask maintenanced to do the
977 * propagation.
978 *
979 * It is possible that maintenance daemon does the first resync too
980 * early, but that's fine, since this will start a retry loop with
981 * 5 second intervals until sync is complete.
982 */
983 if (UnsetMetadataSyncedForAll())
984 {
985 TriggerMetadataSyncOnCommit();
986 }
987
988 if (handle != NULL)
989 {
990 /*
991 * this will be called on memory context cleanup as well, if the worker has been
992 * terminated already this will be a noop
993 */
994 TerminateBackgroundWorker(handle);
995 }
996
997 TransactionModifiedNodeMetadata = true;
998
999 PG_RETURN_VOID();
1000 }
1001
1002
1003 /*
1004 * master_update_node is a wrapper function for old UDF name.
1005 */
1006 Datum
master_update_node(PG_FUNCTION_ARGS)1007 master_update_node(PG_FUNCTION_ARGS)
1008 {
1009 return citus_update_node(fcinfo);
1010 }
1011
1012
1013 /*
1014 * SetLockTimeoutLocally sets the lock_timeout to the given value.
1015 * This setting is local.
1016 */
1017 static void
SetLockTimeoutLocally(int32 lockCooldown)1018 SetLockTimeoutLocally(int32 lockCooldown)
1019 {
1020 set_config_option("lock_timeout", ConvertIntToString(lockCooldown),
1021 (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
1022 GUC_ACTION_LOCAL, true, 0, false);
1023 }
1024
1025
1026 static void
UpdateNodeLocation(int32 nodeId,char * newNodeName,int32 newNodePort)1027 UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
1028 {
1029 const bool indexOK = true;
1030
1031 ScanKeyData scanKey[1];
1032 Datum values[Natts_pg_dist_node];
1033 bool isnull[Natts_pg_dist_node];
1034 bool replace[Natts_pg_dist_node];
1035
1036 Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
1037 TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1038
1039 ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
1040 BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
1041
1042 SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, DistNodeNodeIdIndexId(),
1043 indexOK,
1044 NULL, 1, scanKey);
1045
1046 HeapTuple heapTuple = systable_getnext(scanDescriptor);
1047 if (!HeapTupleIsValid(heapTuple))
1048 {
1049 ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
1050 newNodeName, newNodePort)));
1051 }
1052
1053 memset(replace, 0, sizeof(replace));
1054
1055 values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
1056 isnull[Anum_pg_dist_node_nodeport - 1] = false;
1057 replace[Anum_pg_dist_node_nodeport - 1] = true;
1058
1059 values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(newNodeName);
1060 isnull[Anum_pg_dist_node_nodename - 1] = false;
1061 replace[Anum_pg_dist_node_nodename - 1] = true;
1062
1063 heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
1064
1065 CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);
1066
1067 CitusInvalidateRelcacheByRelid(DistNodeRelationId());
1068
1069 CommandCounterIncrement();
1070
1071 systable_endscan(scanDescriptor);
1072 table_close(pgDistNode, NoLock);
1073 }
1074
1075
1076 /*
1077 * get_shard_id_for_distribution_column function takes a distributed table name and a
1078 * distribution value then returns shard id of the shard which belongs to given table and
1079 * contains given value. This function only works for hash distributed tables.
1080 */
1081 Datum
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)1082 get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
1083 {
1084 CheckCitusVersion(ERROR);
1085
1086 ShardInterval *shardInterval = NULL;
1087
1088 /*
1089 * To have optional parameter as NULL, we defined this UDF as not strict, therefore
1090 * we need to check all parameters for NULL values.
1091 */
1092 if (PG_ARGISNULL(0))
1093 {
1094 ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1095 errmsg("relation cannot be NULL")));
1096 }
1097
1098 Oid relationId = PG_GETARG_OID(0);
1099 EnsureTablePermissions(relationId, ACL_SELECT);
1100
1101 if (!IsCitusTable(relationId))
1102 {
1103 ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1104 errmsg("relation is not distributed")));
1105 }
1106
1107 if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
1108 {
1109 List *shardIntervalList = LoadShardIntervalList(relationId);
1110 if (shardIntervalList == NIL)
1111 {
1112 PG_RETURN_INT64(0);
1113 }
1114
1115 shardInterval = (ShardInterval *) linitial(shardIntervalList);
1116 }
1117 else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) ||
1118 IsCitusTableType(relationId, RANGE_DISTRIBUTED))
1119 {
1120 CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
1121
1122 /* if given table is not reference table, distributionValue cannot be NULL */
1123 if (PG_ARGISNULL(1))
1124 {
1125 ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1126 errmsg("distribution value cannot be NULL for tables other "
1127 "than reference tables.")));
1128 }
1129
1130 Datum inputDatum = PG_GETARG_DATUM(1);
1131 Oid inputDataType = get_fn_expr_argtype(fcinfo->flinfo, 1);
1132 char *distributionValueString = DatumToString(inputDatum, inputDataType);
1133
1134 Var *distributionColumn = DistPartitionKeyOrError(relationId);
1135 Oid distributionDataType = distributionColumn->vartype;
1136
1137 Datum distributionValueDatum = StringToDatum(distributionValueString,
1138 distributionDataType);
1139
1140 shardInterval = FindShardInterval(distributionValueDatum, cacheEntry);
1141 }
1142 else
1143 {
1144 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1145 errmsg("finding shard id of given distribution value is only "
1146 "supported for hash partitioned tables, range partitioned "
1147 "tables and reference tables.")));
1148 }
1149
1150 if (shardInterval != NULL)
1151 {
1152 PG_RETURN_INT64(shardInterval->shardId);
1153 }
1154
1155 PG_RETURN_INT64(0);
1156 }
1157
1158
1159 /*
1160 * FindWorkerNode searches over the worker nodes and returns the workerNode
1161 * if it already exists. Else, the function returns NULL.
1162 */
1163 WorkerNode *
FindWorkerNode(const char * nodeName,int32 nodePort)1164 FindWorkerNode(const char *nodeName, int32 nodePort)
1165 {
1166 HTAB *workerNodeHash = GetWorkerNodeHash();
1167 bool handleFound = false;
1168
1169 WorkerNode *searchedNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
1170 strlcpy(searchedNode->workerName, nodeName, WORKER_LENGTH);
1171 searchedNode->workerPort = nodePort;
1172
1173 void *hashKey = (void *) searchedNode;
1174 WorkerNode *cachedWorkerNode = (WorkerNode *) hash_search(workerNodeHash, hashKey,
1175 HASH_FIND,
1176 &handleFound);
1177 if (handleFound)
1178 {
1179 WorkerNode *workerNode = (WorkerNode *) palloc(sizeof(WorkerNode));
1180 *workerNode = *cachedWorkerNode;
1181 return workerNode;
1182 }
1183
1184 return NULL;
1185 }
1186
1187
1188 /*
1189 * FindWorkerNode searches over the worker nodes and returns the workerNode
1190 * if it exists otherwise it errors out.
1191 */
1192 WorkerNode *
FindWorkerNodeOrError(const char * nodeName,int32 nodePort)1193 FindWorkerNodeOrError(const char *nodeName, int32 nodePort)
1194 {
1195 WorkerNode *node = FindWorkerNode(nodeName, nodePort);
1196 if (node == NULL)
1197 {
1198 ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
1199 errmsg("node %s:%d not found", nodeName, nodePort)));
1200 }
1201 return node;
1202 }
1203
1204
1205 /*
1206 * FindWorkerNodeAnyCluster returns the workerNode no matter which cluster it is a part
1207 * of. FindWorkerNodes, like almost every other function, acts as if nodes in other
1208 * clusters do not exist.
1209 */
1210 WorkerNode *
FindWorkerNodeAnyCluster(const char * nodeName,int32 nodePort)1211 FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
1212 {
1213 WorkerNode *workerNode = NULL;
1214
1215 Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
1216 TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1217
1218 HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
1219 if (heapTuple != NULL)
1220 {
1221 workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
1222 }
1223
1224 table_close(pgDistNode, NoLock);
1225 return workerNode;
1226 }
1227
1228
1229 /*
1230 * ReadDistNode iterates over pg_dist_node table, converts each row
1231 * into it's memory representation (i.e., WorkerNode) and adds them into
1232 * a list. Lastly, the list is returned to the caller.
1233 *
1234 * It skips nodes which are not in the current clusters unless requested to do otherwise
1235 * by includeNodesFromOtherClusters.
1236 */
1237 List *
ReadDistNode(bool includeNodesFromOtherClusters)1238 ReadDistNode(bool includeNodesFromOtherClusters)
1239 {
1240 ScanKeyData scanKey[1];
1241 int scanKeyCount = 0;
1242 List *workerNodeList = NIL;
1243
1244 Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
1245
1246 SysScanDesc scanDescriptor = systable_beginscan(pgDistNode,
1247 InvalidOid, false,
1248 NULL, scanKeyCount, scanKey);
1249
1250 TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1251
1252 HeapTuple heapTuple = systable_getnext(scanDescriptor);
1253 while (HeapTupleIsValid(heapTuple))
1254 {
1255 WorkerNode *workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
1256
1257 if (includeNodesFromOtherClusters ||
1258 strncmp(workerNode->nodeCluster, CurrentCluster, WORKER_LENGTH) == 0)
1259 {
1260 /* the coordinator acts as if it never sees nodes not in it's cluster */
1261 workerNodeList = lappend(workerNodeList, workerNode);
1262 }
1263
1264 heapTuple = systable_getnext(scanDescriptor);
1265 }
1266
1267 systable_endscan(scanDescriptor);
1268 table_close(pgDistNode, NoLock);
1269
1270 return workerNodeList;
1271 }
1272
1273
1274 /*
1275 * RemoveNodeFromCluster removes the provided node from the pg_dist_node table of
1276 * the master node and all nodes with metadata.
1277 * The call to the master_remove_node should be done by the super user. If there are
1278 * active shard placements on the node; the function errors out.
1279 * This function also deletes all reference table placements belong to the given node from
1280 * pg_dist_placement, but it does not drop actual placement at the node. It also
1281 * modifies replication factor of the colocation group of reference tables, so that
1282 * replication factor will be equal to worker count.
1283 */
1284 static void
RemoveNodeFromCluster(char * nodeName,int32 nodePort)1285 RemoveNodeFromCluster(char *nodeName, int32 nodePort)
1286 {
1287 WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
1288 if (NodeIsPrimary(workerNode))
1289 {
1290 if (CanRemoveReferenceTablePlacements())
1291 {
1292 /*
1293 * Delete reference table placements so they are not taken into account
1294 * for the check if there are placements after this.
1295 */
1296 DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
1297 }
1298 if (NodeGroupHasLivePlacements(workerNode->groupId))
1299 {
1300 if (ActivePrimaryNodeCount() == 1 && ClusterHasReferenceTable())
1301 {
1302 ereport(ERROR, (errmsg(
1303 "cannot remove the last worker node because there are reference "
1304 "tables and it would cause data loss on reference tables"),
1305 errhint(
1306 "To proceed, either drop the reference tables or use "
1307 "undistribute_table() function to convert them to local tables")));
1308 }
1309 ereport(ERROR, (errmsg("cannot remove the primary node of a node group "
1310 "which has shard placements"),
1311 errhint(
1312 "To proceed, either drop the distributed tables or use "
1313 "undistribute_table() function to convert them to local tables")));
1314 }
1315
1316 /*
1317 * Secondary nodes are read-only, never 2PC is used.
1318 * Hence, no items can be inserted to pg_dist_transaction for secondary nodes.
1319 */
1320 DeleteWorkerTransactions(workerNode);
1321 }
1322
1323 DeleteNodeRow(workerNode->workerName, nodePort);
1324
1325 RemoveOldShardPlacementForNodeGroup(workerNode->groupId);
1326
1327 char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
1328
1329 /* make sure we don't have any lingering session lifespan connections */
1330 CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
1331
1332 SendCommandToWorkersWithMetadata(nodeDeleteCommand);
1333 }
1334
1335
1336 /*
1337 * RemoveOldShardPlacementForNodeGroup removes all old shard placements
1338 * for the given node group from pg_dist_placement.
1339 */
1340 static void
RemoveOldShardPlacementForNodeGroup(int groupId)1341 RemoveOldShardPlacementForNodeGroup(int groupId)
1342 {
1343 /*
1344 * Prevent concurrent deferred drop
1345 */
1346 LockPlacementCleanup();
1347 List *shardPlacementsOnNode = AllShardPlacementsOnNodeGroup(groupId);
1348 GroupShardPlacement *placement = NULL;
1349 foreach_ptr(placement, shardPlacementsOnNode)
1350 {
1351 if (placement->shardState == SHARD_STATE_TO_DELETE)
1352 {
1353 DeleteShardPlacementRow(placement->placementId);
1354 }
1355 }
1356 }
1357
1358
1359 /*
1360 * CanRemoveReferenceTablePlacements returns true if active primary
1361 * node count is more than 1, which means that even if we remove a node
1362 * we will still have some other node that has reference table placement.
1363 */
1364 static bool
CanRemoveReferenceTablePlacements(void)1365 CanRemoveReferenceTablePlacements(void)
1366 {
1367 return ActivePrimaryNodeCount() > 1;
1368 }
1369
1370
1371 /* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */
1372 uint32
CountPrimariesWithMetadata(void)1373 CountPrimariesWithMetadata(void)
1374 {
1375 uint32 primariesWithMetadata = 0;
1376 WorkerNode *workerNode = NULL;
1377
1378 HASH_SEQ_STATUS status;
1379 HTAB *workerNodeHash = GetWorkerNodeHash();
1380
1381 hash_seq_init(&status, workerNodeHash);
1382
1383 while ((workerNode = hash_seq_search(&status)) != NULL)
1384 {
1385 if (workerNode->hasMetadata && NodeIsPrimary(workerNode))
1386 {
1387 primariesWithMetadata++;
1388 }
1389 }
1390
1391 return primariesWithMetadata;
1392 }
1393
1394
1395 /*
1396 * AddNodeMetadata checks the given node information and adds the specified node to the
1397 * pg_dist_node table of the master and workers with metadata.
1398 * If the node already exists, the function returns the id of the node.
1399 * If not, the following procedure is followed while adding a node: If the groupId is not
1400 * explicitly given by the user, the function picks the group that the new node should
1401 * be in with respect to GroupSize. Then, the new node is inserted into the local
1402 * pg_dist_node as well as the nodes with hasmetadata=true.
1403 */
1404 static int
AddNodeMetadata(char * nodeName,int32 nodePort,NodeMetadata * nodeMetadata,bool * nodeAlreadyExists)1405 AddNodeMetadata(char *nodeName, int32 nodePort,
1406 NodeMetadata *nodeMetadata,
1407 bool *nodeAlreadyExists)
1408 {
1409 EnsureCoordinator();
1410
1411 *nodeAlreadyExists = false;
1412
1413 /*
1414 * Prevent / wait for concurrent modification before checking whether
1415 * the worker already exists in pg_dist_node.
1416 */
1417 LockRelationOid(DistNodeRelationId(), RowShareLock);
1418
1419 WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1420 if (workerNode != NULL)
1421 {
1422 /* return early without holding locks when the node already exists */
1423 *nodeAlreadyExists = true;
1424
1425 return workerNode->nodeId;
1426 }
1427
1428 /*
1429 * We are going to change pg_dist_node, prevent any concurrent reads that
1430 * are not tolerant to concurrent node addition by taking an exclusive
1431 * lock (conflicts with all but AccessShareLock).
1432 *
1433 * We may want to relax or have more fine-grained locking in the future
1434 * to allow users to add multiple nodes concurrently.
1435 */
1436 LockRelationOid(DistNodeRelationId(), ExclusiveLock);
1437
1438 /* recheck in case 2 node additions pass the first check concurrently */
1439 workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1440 if (workerNode != NULL)
1441 {
1442 *nodeAlreadyExists = true;
1443
1444 return workerNode->nodeId;
1445 }
1446
1447 if (nodeMetadata->groupId != COORDINATOR_GROUP_ID &&
1448 strcmp(nodeName, "localhost") != 0)
1449 {
1450 /*
1451 * User tries to add a worker with a non-localhost address. If the coordinator
1452 * is added with "localhost" as well, the worker won't be able to connect.
1453 */
1454
1455 bool isCoordinatorInMetadata = false;
1456 WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
1457 &isCoordinatorInMetadata);
1458 if (isCoordinatorInMetadata &&
1459 strcmp(coordinatorNode->workerName, "localhost") == 0)
1460 {
1461 ereport(ERROR, (errmsg("cannot add a worker node when the coordinator "
1462 "hostname is set to localhost"),
1463 errdetail("Worker nodes need to be able to connect to the "
1464 "coordinator to transfer data."),
1465 errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
1466 "to configure the coordinator hostname")));
1467 }
1468 }
1469
1470 /*
1471 * When adding the first worker when the coordinator has shard placements,
1472 * print a notice on how to drain the coordinator.
1473 */
1474 if (nodeMetadata->groupId != COORDINATOR_GROUP_ID && CoordinatorAddedAsWorkerNode() &&
1475 ActivePrimaryNonCoordinatorNodeCount() == 0 &&
1476 NodeGroupHasShardPlacements(COORDINATOR_GROUP_ID, true))
1477 {
1478 WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();
1479
1480 ereport(NOTICE, (errmsg("shards are still on the coordinator after adding the "
1481 "new node"),
1482 errhint("Use SELECT rebalance_table_shards(); to balance "
1483 "shards data between workers and coordinator or "
1484 "SELECT citus_drain_node(%s,%d); to permanently "
1485 "move shards away from the coordinator.",
1486 quote_literal_cstr(coordinator->workerName),
1487 coordinator->workerPort)));
1488 }
1489
1490 /* user lets Citus to decide on the group that the newly added node should be in */
1491 if (nodeMetadata->groupId == INVALID_GROUP_ID)
1492 {
1493 nodeMetadata->groupId = GetNextGroupId();
1494 }
1495
1496 if (nodeMetadata->groupId == COORDINATOR_GROUP_ID)
1497 {
1498 /*
1499 * Coordinator has always the authoritative metadata, reflect this
1500 * fact in the pg_dist_node.
1501 */
1502 nodeMetadata->hasMetadata = true;
1503 nodeMetadata->metadataSynced = true;
1504
1505 /*
1506 * There is no concept of "inactive" coordinator, so hard code it.
1507 */
1508 nodeMetadata->isActive = true;
1509 }
1510
1511 /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
1512 if (nodeMetadata->nodeRole != InvalidOid && nodeMetadata->nodeRole ==
1513 PrimaryNodeRoleId())
1514 {
1515 WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(nodeMetadata->groupId,
1516 NULL);
1517
1518 if (existingPrimaryNode != NULL)
1519 {
1520 ereport(ERROR, (errmsg("group %d already has a primary node",
1521 nodeMetadata->groupId)));
1522 }
1523 }
1524
1525 if (nodeMetadata->nodeRole == PrimaryNodeRoleId())
1526 {
1527 if (strncmp(nodeMetadata->nodeCluster,
1528 WORKER_DEFAULT_CLUSTER,
1529 WORKER_LENGTH) != 0)
1530 {
1531 ereport(ERROR, (errmsg("primaries must be added to the default cluster")));
1532 }
1533 }
1534
1535 /* generate the new node id from the sequence */
1536 int nextNodeIdInt = GetNextNodeId();
1537
1538 InsertNodeRow(nextNodeIdInt, nodeName, nodePort, nodeMetadata);
1539
1540 workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1541
1542 /* send the delete command to all primary nodes with metadata */
1543 char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
1544 SendCommandToWorkersWithMetadata(nodeDeleteCommand);
1545
1546 /* finally prepare the insert command and send it to all primary nodes */
1547 uint32 primariesWithMetadata = CountPrimariesWithMetadata();
1548 if (primariesWithMetadata != 0)
1549 {
1550 List *workerNodeList = list_make1(workerNode);
1551 char *nodeInsertCommand = NodeListInsertCommand(workerNodeList);
1552
1553 SendCommandToWorkersWithMetadata(nodeInsertCommand);
1554 }
1555
1556 return workerNode->nodeId;
1557 }
1558
1559
1560 /*
1561 * SetWorkerColumn function sets the column with the specified index
1562 * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
1563 * It also sends the same command for node update to other metadata nodes.
1564 * If anything fails during the transaction, we rollback it.
1565 * Returns the new worker node after the modification.
1566 */
1567 WorkerNode *
SetWorkerColumn(WorkerNode * workerNode,int columnIndex,Datum value)1568 SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
1569 {
1570 workerNode = SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
1571
1572 char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode,
1573 columnIndex,
1574 value);
1575
1576 SendCommandToWorkersWithMetadata(metadataSyncCommand);
1577
1578 return workerNode;
1579 }
1580
1581
1582 /*
1583 * SetWorkerColumnOptional function sets the column with the specified index
1584 * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
1585 * It also sends the same command optionally for node update to other metadata nodes,
1586 * meaning that failures are ignored. Returns the new worker node after the modification.
1587 */
1588 WorkerNode *
SetWorkerColumnOptional(WorkerNode * workerNode,int columnIndex,Datum value)1589 SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value)
1590 {
1591 char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode,
1592 columnIndex,
1593 value);
1594
1595 List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
1596 ShareLock);
1597
1598 /* open connections in parallel */
1599 WorkerNode *worker = NULL;
1600 foreach_ptr(worker, workerNodeList)
1601 {
1602 bool success = SendOptionalCommandListToWorkerInCoordinatedTransaction(
1603 worker->workerName, worker->workerPort,
1604 CurrentUserName(),
1605 list_make1(metadataSyncCommand));
1606
1607 if (!success)
1608 {
1609 /* metadata out of sync, mark the worker as not synced */
1610 ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d "
1611 "is failed on node %s:%d."
1612 "Metadata on %s:%d is marked as out of sync.",
1613 workerNode->workerName, workerNode->workerPort,
1614 worker->workerName, worker->workerPort,
1615 worker->workerName, worker->workerPort)));
1616
1617 SetWorkerColumnLocalOnly(worker, Anum_pg_dist_node_metadatasynced,
1618 BoolGetDatum(false));
1619 }
1620 else if (workerNode->nodeId == worker->nodeId)
1621 {
1622 /*
1623 * If this is the node we want to update and it is updated succesfully,
1624 * then we can safely update the flag on the coordinator as well.
1625 */
1626 SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
1627 }
1628 }
1629
1630 return FindWorkerNode(workerNode->workerName, workerNode->workerPort);
1631 }
1632
1633
1634 /*
1635 * SetWorkerColumnLocalOnly function sets the column with the specified index
1636 * (see pg_dist_node.h) on the worker in pg_dist_node.
1637 * It returns the new worker node after the modification.
1638 */
1639 WorkerNode *
SetWorkerColumnLocalOnly(WorkerNode * workerNode,int columnIndex,Datum value)1640 SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
1641 {
1642 Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
1643 TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1644 HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
1645
1646 Datum values[Natts_pg_dist_node];
1647 bool isnull[Natts_pg_dist_node];
1648 bool replace[Natts_pg_dist_node];
1649
1650 if (heapTuple == NULL)
1651 {
1652 ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
1653 workerNode->workerName, workerNode->workerPort)));
1654 }
1655
1656 memset(replace, 0, sizeof(replace));
1657 values[columnIndex - 1] = value;
1658 isnull[columnIndex - 1] = false;
1659 replace[columnIndex - 1] = true;
1660
1661 heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
1662
1663 CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);
1664
1665 CitusInvalidateRelcacheByRelid(DistNodeRelationId());
1666 CommandCounterIncrement();
1667
1668 WorkerNode *newWorkerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
1669
1670 table_close(pgDistNode, NoLock);
1671
1672 return newWorkerNode;
1673 }
1674
1675
1676 /*
1677 * GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is
1678 * valid or not. Then it returns the necessary metadata sync command as a string.
1679 */
1680 static char *
GetMetadataSyncCommandToSetNodeColumn(WorkerNode * workerNode,int columnIndex,Datum value)1681 GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum
1682 value)
1683 {
1684 char *metadataSyncCommand = NULL;
1685
1686 switch (columnIndex)
1687 {
1688 case Anum_pg_dist_node_hasmetadata:
1689 {
1690 ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata");
1691 metadataSyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId,
1692 DatumGetBool(value));
1693 break;
1694 }
1695
1696 case Anum_pg_dist_node_isactive:
1697 {
1698 ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive");
1699
1700 metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId,
1701 DatumGetBool(value));
1702 break;
1703 }
1704
1705 case Anum_pg_dist_node_shouldhaveshards:
1706 {
1707 metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId,
1708 DatumGetBool(value));
1709 break;
1710 }
1711
1712 case Anum_pg_dist_node_metadatasynced:
1713 {
1714 ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced");
1715 metadataSyncCommand = NodeMetadataSyncedUpdateCommand(workerNode->nodeId,
1716 DatumGetBool(value));
1717 break;
1718 }
1719
1720 default:
1721 {
1722 ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
1723 workerNode->workerName, workerNode->workerPort)));
1724 }
1725 }
1726
1727 return metadataSyncCommand;
1728 }
1729
1730
1731 /*
1732 * NodeHasmetadataUpdateCommand generates and returns a SQL UPDATE command
1733 * that updates the hasmetada column of pg_dist_node, for the given nodeid.
1734 */
1735 static char *
NodeHasmetadataUpdateCommand(uint32 nodeId,bool hasMetadata)1736 NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata)
1737 {
1738 StringInfo updateCommand = makeStringInfo();
1739 char *hasMetadataString = hasMetadata ? "TRUE" : "FALSE";
1740 appendStringInfo(updateCommand,
1741 "UPDATE pg_dist_node SET hasmetadata = %s "
1742 "WHERE nodeid = %u",
1743 hasMetadataString, nodeId);
1744 return updateCommand->data;
1745 }
1746
1747
1748 /*
1749 * NodeMetadataSyncedUpdateCommand generates and returns a SQL UPDATE command
1750 * that updates the metadataSynced column of pg_dist_node, for the given nodeid.
1751 */
1752 static char *
NodeMetadataSyncedUpdateCommand(uint32 nodeId,bool metadataSynced)1753 NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced)
1754 {
1755 StringInfo updateCommand = makeStringInfo();
1756 char *hasMetadataString = metadataSynced ? "TRUE" : "FALSE";
1757 appendStringInfo(updateCommand,
1758 "UPDATE pg_dist_node SET metadatasynced = %s "
1759 "WHERE nodeid = %u",
1760 hasMetadataString, nodeId);
1761 return updateCommand->data;
1762 }
1763
1764
1765 /*
1766 * ErrorIfCoordinatorMetadataSetFalse throws an error if the input node
1767 * is the coordinator and the value is false.
1768 */
1769 static void
ErrorIfCoordinatorMetadataSetFalse(WorkerNode * workerNode,Datum value,char * field)1770 ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field)
1771 {
1772 bool valueBool = DatumGetBool(value);
1773 if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
1774 {
1775 ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
1776 "coordinator node", field)));
1777 }
1778 }
1779
1780
1781 /*
1782 * SetShouldHaveShards function sets the shouldhaveshards column of the
1783 * specified worker in pg_dist_node. also propagates this to other metadata nodes.
1784 * It returns the new worker node after the modification.
1785 */
1786 static WorkerNode *
SetShouldHaveShards(WorkerNode * workerNode,bool shouldHaveShards)1787 SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards)
1788 {
1789 return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards, BoolGetDatum(
1790 shouldHaveShards));
1791 }
1792
1793
1794 /*
1795 * SetNodeState function sets the isactive column of the specified worker in
1796 * pg_dist_node to isActive. Also propagates this to other metadata nodes.
1797 * It returns the new worker node after the modification.
1798 */
1799 static WorkerNode *
SetNodeState(char * nodeName,int nodePort,bool isActive)1800 SetNodeState(char *nodeName, int nodePort, bool isActive)
1801 {
1802 WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1803 return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(
1804 isActive));
1805 }
1806
1807
1808 /*
1809 * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the
1810 * node is not found this function returns NULL.
1811 *
1812 * This function may return worker nodes from other clusters.
1813 */
1814 static HeapTuple
GetNodeTuple(const char * nodeName,int32 nodePort)1815 GetNodeTuple(const char *nodeName, int32 nodePort)
1816 {
1817 Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
1818 const int scanKeyCount = 2;
1819 const bool indexOK = false;
1820
1821 ScanKeyData scanKey[2];
1822 HeapTuple nodeTuple = NULL;
1823
1824 ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
1825 BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
1826 ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
1827 BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort));
1828 SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK,
1829 NULL, scanKeyCount, scanKey);
1830
1831 HeapTuple heapTuple = systable_getnext(scanDescriptor);
1832 if (HeapTupleIsValid(heapTuple))
1833 {
1834 nodeTuple = heap_copytuple(heapTuple);
1835 }
1836
1837 systable_endscan(scanDescriptor);
1838 table_close(pgDistNode, NoLock);
1839
1840 return nodeTuple;
1841 }
1842
1843
1844 /*
1845 * GetNextGroupId allocates and returns a unique groupId for the group
1846 * to be created. This allocation occurs both in shared memory and in write
1847 * ahead logs; writing to logs avoids the risk of having groupId collisions.
1848 *
1849 * Please note that the caller is still responsible for finalizing node data
1850 * and the groupId with the master node. Further note that this function relies
1851 * on an internal sequence created in initdb to generate unique identifiers.
1852 */
1853 int32
GetNextGroupId()1854 GetNextGroupId()
1855 {
1856 text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME);
1857 Oid sequenceId = ResolveRelationId(sequenceName, false);
1858 Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
1859 Oid savedUserId = InvalidOid;
1860 int savedSecurityContext = 0;
1861
1862 GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
1863 SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
1864
1865 /* generate new and unique shardId from sequence */
1866 Datum groupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
1867
1868 SetUserIdAndSecContext(savedUserId, savedSecurityContext);
1869
1870 int32 groupId = DatumGetInt32(groupIdDatum);
1871
1872 return groupId;
1873 }
1874
1875
1876 /*
1877 * GetNextNodeId allocates and returns a unique nodeId for the node
1878 * to be added. This allocation occurs both in shared memory and in write
1879 * ahead logs; writing to logs avoids the risk of having nodeId collisions.
1880 *
1881 * Please note that the caller is still responsible for finalizing node data
1882 * and the nodeId with the master node. Further note that this function relies
1883 * on an internal sequence created in initdb to generate unique identifiers.
1884 */
1885 int
GetNextNodeId()1886 GetNextNodeId()
1887 {
1888 text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME);
1889 Oid sequenceId = ResolveRelationId(sequenceName, false);
1890 Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
1891 Oid savedUserId = InvalidOid;
1892 int savedSecurityContext = 0;
1893
1894 GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
1895 SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
1896
1897 /* generate new and unique shardId from sequence */
1898 Datum nextNodeIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
1899
1900 SetUserIdAndSecContext(savedUserId, savedSecurityContext);
1901
1902 int nextNodeId = DatumGetUInt32(nextNodeIdDatum);
1903
1904 return nextNodeId;
1905 }
1906
1907
1908 /*
1909 * EnsureCoordinator checks if the current node is the coordinator. If it does not,
1910 * the function errors out.
1911 */
1912 void
EnsureCoordinator(void)1913 EnsureCoordinator(void)
1914 {
1915 int32 localGroupId = GetLocalGroupId();
1916
1917 if (localGroupId != 0)
1918 {
1919 ereport(ERROR, (errmsg("operation is not allowed on this node"),
1920 errhint("Connect to the coordinator and run it again.")));
1921 }
1922 }
1923
1924
1925 /*
1926 * InsertCoordinatorIfClusterEmpty can be used to ensure Citus tables can be
1927 * created even on a node that has just performed CREATE EXTENSION citus;
1928 */
1929 void
InsertCoordinatorIfClusterEmpty(void)1930 InsertCoordinatorIfClusterEmpty(void)
1931 {
1932 /* prevent concurrent node additions */
1933 Relation pgDistNode = table_open(DistNodeRelationId(), RowShareLock);
1934
1935 if (!HasAnyNodes())
1936 {
1937 /*
1938 * create_distributed_table being called for the first time and there are
1939 * no pg_dist_node records. Add a record for the coordinator.
1940 */
1941 InsertPlaceholderCoordinatorRecord();
1942 }
1943
1944 /*
1945 * We release the lock, if InsertPlaceholderCoordinatorRecord was called
1946 * we already have a strong (RowExclusive) lock.
1947 */
1948 table_close(pgDistNode, RowShareLock);
1949 }
1950
1951
1952 /*
1953 * InsertPlaceholderCoordinatorRecord inserts a placeholder record for the coordinator
1954 * to be able to create distributed tables on a single node.
1955 */
1956 static void
InsertPlaceholderCoordinatorRecord(void)1957 InsertPlaceholderCoordinatorRecord(void)
1958 {
1959 NodeMetadata nodeMetadata = DefaultNodeMetadata();
1960 nodeMetadata.groupId = 0;
1961 nodeMetadata.shouldHaveShards = true;
1962 nodeMetadata.nodeRole = PrimaryNodeRoleId();
1963 nodeMetadata.nodeCluster = "default";
1964
1965 bool nodeAlreadyExists = false;
1966
1967 /* as long as there is a single node, localhost should be ok */
1968 AddNodeMetadata(LocalHostName, PostPortNumber, &nodeMetadata, &nodeAlreadyExists);
1969 }
1970
1971
1972 /*
1973 * InsertNodeRow opens the node system catalog, and inserts a new row with the
1974 * given values into that system catalog.
1975 *
1976 * NOTE: If you call this function you probably need to have taken a
1977 * ShareRowExclusiveLock then checked that you're not adding a second primary to
1978 * an existing group. If you don't it's possible for the metadata to become inconsistent.
1979 */
1980 static void
InsertNodeRow(int nodeid,char * nodeName,int32 nodePort,NodeMetadata * nodeMetadata)1981 InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
1982 {
1983 Datum values[Natts_pg_dist_node];
1984 bool isNulls[Natts_pg_dist_node];
1985
1986 Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
1987 Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
1988
1989 /* form new shard tuple */
1990 memset(values, 0, sizeof(values));
1991 memset(isNulls, false, sizeof(isNulls));
1992
1993 values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
1994 values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
1995 values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
1996 values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
1997 values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeMetadata->nodeRack);
1998 values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(nodeMetadata->hasMetadata);
1999 values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(
2000 nodeMetadata->metadataSynced);
2001 values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
2002 values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
2003 values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
2004 values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
2005 nodeMetadata->shouldHaveShards);
2006
2007 Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
2008
2009 TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
2010 HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
2011
2012 CatalogTupleInsert(pgDistNode, heapTuple);
2013
2014 CitusInvalidateRelcacheByRelid(DistNodeRelationId());
2015
2016 /* increment the counter so that next command can see the row */
2017 CommandCounterIncrement();
2018
2019 /* close relation */
2020 table_close(pgDistNode, NoLock);
2021 }
2022
2023
2024 /*
2025 * DeleteNodeRow removes the requested row from pg_dist_node table if it exists.
2026 */
2027 static void
DeleteNodeRow(char * nodeName,int32 nodePort)2028 DeleteNodeRow(char *nodeName, int32 nodePort)
2029 {
2030 const int scanKeyCount = 2;
2031 bool indexOK = false;
2032
2033 ScanKeyData scanKey[2];
2034 Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
2035
2036 /*
2037 * simple_heap_delete() expects that the caller has at least an
2038 * AccessShareLock on replica identity index.
2039 */
2040 Relation replicaIndex = index_open(RelationGetReplicaIndex(pgDistNode),
2041 AccessShareLock);
2042
2043 ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
2044 BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
2045 ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
2046 BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort));
2047
2048 SysScanDesc heapScan = systable_beginscan(pgDistNode, InvalidOid, indexOK,
2049 NULL, scanKeyCount, scanKey);
2050
2051 HeapTuple heapTuple = systable_getnext(heapScan);
2052
2053 if (!HeapTupleIsValid(heapTuple))
2054 {
2055 ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
2056 nodeName, nodePort)));
2057 }
2058
2059 simple_heap_delete(pgDistNode, &(heapTuple->t_self));
2060
2061 systable_endscan(heapScan);
2062
2063 /* ensure future commands don't use the node we just removed */
2064 CitusInvalidateRelcacheByRelid(DistNodeRelationId());
2065
2066 /* increment the counter so that next command won't see the row */
2067 CommandCounterIncrement();
2068
2069 table_close(replicaIndex, AccessShareLock);
2070 table_close(pgDistNode, NoLock);
2071 }
2072
2073
2074 /*
2075 * TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
2076 * converts this tuple to an equivalent struct in memory. The function assumes
2077 * the caller already has locks on the tuple, and doesn't perform any locking.
2078 */
2079 static WorkerNode *
TupleToWorkerNode(TupleDesc tupleDescriptor,HeapTuple heapTuple)2080 TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
2081 {
2082 Datum datumArray[Natts_pg_dist_node];
2083 bool isNullArray[Natts_pg_dist_node];
2084
2085 Assert(!HeapTupleHasNulls(heapTuple));
2086
2087 /*
2088 * This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...",
2089 * therefore heap_deform_tuple() won't set the isNullArray for this column. We
2090 * initialize it true to be safe in that case.
2091 */
2092 memset(isNullArray, true, sizeof(isNullArray));
2093
2094 /*
2095 * We use heap_deform_tuple() instead of heap_getattr() to expand tuple
2096 * to contain missing values when ALTER TABLE ADD COLUMN happens.
2097 */
2098 heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
2099
2100 char *nodeName = DatumGetCString(datumArray[Anum_pg_dist_node_nodename - 1]);
2101 char *nodeRack = DatumGetCString(datumArray[Anum_pg_dist_node_noderack - 1]);
2102
2103 WorkerNode *workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
2104 workerNode->nodeId = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeid - 1]);
2105 workerNode->workerPort = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeport - 1]);
2106 workerNode->groupId = DatumGetInt32(datumArray[Anum_pg_dist_node_groupid - 1]);
2107 strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
2108 strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
2109 workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]);
2110 workerNode->metadataSynced =
2111 DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]);
2112 workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
2113 workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
2114 workerNode->shouldHaveShards = DatumGetBool(
2115 datumArray[Anum_pg_dist_node_shouldhaveshards -
2116 1]);
2117
2118 /*
2119 * nodecluster column can be missing. In the case of extension creation/upgrade,
2120 * master_initialize_node_metadata function is called before the nodecluster
2121 * column is added to pg_dist_node table.
2122 */
2123 if (!isNullArray[Anum_pg_dist_node_nodecluster - 1])
2124 {
2125 Name nodeClusterName =
2126 DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]);
2127 char *nodeClusterString = NameStr(*nodeClusterName);
2128 strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
2129 }
2130
2131 return workerNode;
2132 }
2133
2134
2135 /*
2136 * StringToDatum transforms a string representation into a Datum.
2137 */
2138 Datum
StringToDatum(char * inputString,Oid dataType)2139 StringToDatum(char *inputString, Oid dataType)
2140 {
2141 Oid typIoFunc = InvalidOid;
2142 Oid typIoParam = InvalidOid;
2143 int32 typeModifier = -1;
2144
2145 getTypeInputInfo(dataType, &typIoFunc, &typIoParam);
2146 getBaseTypeAndTypmod(dataType, &typeModifier);
2147
2148 Datum datum = OidInputFunctionCall(typIoFunc, inputString, typIoParam, typeModifier);
2149
2150 return datum;
2151 }
2152
2153
2154 /*
2155 * DatumToString returns the string representation of the given datum.
2156 */
2157 char *
DatumToString(Datum datum,Oid dataType)2158 DatumToString(Datum datum, Oid dataType)
2159 {
2160 Oid typIoFunc = InvalidOid;
2161 bool typIsVarlena = false;
2162
2163 getTypeOutputInfo(dataType, &typIoFunc, &typIsVarlena);
2164 char *outputString = OidOutputFunctionCall(typIoFunc, datum);
2165
2166 return outputString;
2167 }
2168
2169
2170 /*
2171 * UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata
2172 * nodes to false. It returns true if it updated at least a node.
2173 */
2174 static bool
UnsetMetadataSyncedForAll(void)2175 UnsetMetadataSyncedForAll(void)
2176 {
2177 bool updatedAtLeastOne = false;
2178 ScanKeyData scanKey[2];
2179 int scanKeyCount = 2;
2180 bool indexOK = false;
2181
2182 /*
2183 * Concurrent citus_update_node() calls might iterate and try to update
2184 * pg_dist_node in different orders. To protect against deadlock, we
2185 * get an exclusive lock here.
2186 */
2187 Relation relation = table_open(DistNodeRelationId(), ExclusiveLock);
2188 TupleDesc tupleDescriptor = RelationGetDescr(relation);
2189 ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata,
2190 BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
2191 ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced,
2192 BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
2193
2194 CatalogIndexState indstate = CatalogOpenIndexes(relation);
2195
2196 SysScanDesc scanDescriptor = systable_beginscan(relation,
2197 InvalidOid, indexOK,
2198 NULL, scanKeyCount, scanKey);
2199
2200 HeapTuple heapTuple = systable_getnext(scanDescriptor);
2201 if (HeapTupleIsValid(heapTuple))
2202 {
2203 updatedAtLeastOne = true;
2204 }
2205
2206 while (HeapTupleIsValid(heapTuple))
2207 {
2208 Datum values[Natts_pg_dist_node];
2209 bool isnull[Natts_pg_dist_node];
2210 bool replace[Natts_pg_dist_node];
2211
2212 memset(replace, false, sizeof(replace));
2213 memset(isnull, false, sizeof(isnull));
2214 memset(values, 0, sizeof(values));
2215
2216 values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
2217 replace[Anum_pg_dist_node_metadatasynced - 1] = true;
2218
2219 HeapTuple newHeapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values,
2220 isnull,
2221 replace);
2222
2223 CatalogTupleUpdateWithInfo(relation, &newHeapTuple->t_self, newHeapTuple,
2224 indstate);
2225
2226 CommandCounterIncrement();
2227
2228 heap_freetuple(newHeapTuple);
2229
2230 heapTuple = systable_getnext(scanDescriptor);
2231 }
2232
2233 systable_endscan(scanDescriptor);
2234 CatalogCloseIndexes(indstate);
2235 table_close(relation, NoLock);
2236
2237 return updatedAtLeastOne;
2238 }
2239