1 /*
2  * node_metadata.c
3  *	  Functions that operate on pg_dist_node
4  *
5  * Copyright (c) Citus Data, Inc.
6  */
7 #include "postgres.h"
8 #include "miscadmin.h"
9 #include "funcapi.h"
10 #include "utils/plancache.h"
11 
12 
13 #include "access/genam.h"
14 #include "access/heapam.h"
15 #include "access/htup.h"
16 #include "access/htup_details.h"
17 #include "access/skey.h"
18 #include "access/skey.h"
19 #include "access/tupmacs.h"
20 #include "access/xact.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "commands/sequence.h"
24 #include "distributed/citus_acquire_lock.h"
25 #include "distributed/citus_safe_lib.h"
26 #include "distributed/colocation_utils.h"
27 #include "distributed/commands.h"
28 #include "distributed/commands/utility_hook.h"
29 #include "distributed/connection_management.h"
30 #include "distributed/maintenanced.h"
31 #include "distributed/coordinator_protocol.h"
32 #include "distributed/metadata_utility.h"
33 #include "distributed/metadata/distobject.h"
34 #include "distributed/metadata_cache.h"
35 #include "distributed/metadata_sync.h"
36 #include "distributed/multi_join_order.h"
37 #include "distributed/multi_router_planner.h"
38 #include "distributed/pg_dist_node.h"
39 #include "distributed/reference_table_utils.h"
40 #include "distributed/remote_commands.h"
41 #include "distributed/resource_lock.h"
42 #include "distributed/shardinterval_utils.h"
43 #include "distributed/shared_connection_stats.h"
44 #include "distributed/string_utils.h"
45 #include "distributed/transaction_recovery.h"
46 #include "distributed/version_compat.h"
47 #include "distributed/worker_manager.h"
48 #include "distributed/worker_transaction.h"
49 #include "lib/stringinfo.h"
50 #include "postmaster/postmaster.h"
51 #include "storage/bufmgr.h"
52 #include "storage/lmgr.h"
53 #include "storage/lock.h"
54 #include "storage/fd.h"
55 #include "utils/builtins.h"
56 #include "utils/fmgroids.h"
57 #include "utils/lsyscache.h"
58 #include "utils/rel.h"
59 #include "utils/relcache.h"
60 
61 #define INVALID_GROUP_ID -1
62 
63 /* default group size */
64 int GroupSize = 1;
65 
66 /* config variable managed via guc.c */
67 char *CurrentCluster = "default";
68 
69 /*
70  * Config variable to control whether we should replicate reference tables on
71  * node activation or we should defer it to shard creation.
72  */
73 bool ReplicateReferenceTablesOnActivate = true;
74 
75 /* did current transaction modify pg_dist_node? */
76 bool TransactionModifiedNodeMetadata = false;
77 
78 typedef struct NodeMetadata
79 {
80 	int32 groupId;
81 	char *nodeRack;
82 	bool hasMetadata;
83 	bool metadataSynced;
84 	bool isActive;
85 	Oid nodeRole;
86 	bool shouldHaveShards;
87 	char *nodeCluster;
88 } NodeMetadata;
89 
90 /* local function forward declarations */
91 static int ActivateNode(char *nodeName, int nodePort);
92 static bool CanRemoveReferenceTablePlacements(void);
93 static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
94 static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
95 						   *nodeMetadata, bool *nodeAlreadyExists);
96 static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
97 static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
98 static int32 GetNextGroupId(void);
99 static int GetNextNodeId(void);
100 static void InsertPlaceholderCoordinatorRecord(void);
101 static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
102 						  *nodeMetadata);
103 static void DeleteNodeRow(char *nodename, int32 nodeport);
104 static void SetUpDistributedTableDependencies(WorkerNode *workerNode);
105 static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
106 static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
107 static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
108 static bool NodeIsLocal(WorkerNode *worker);
109 static void SetLockTimeoutLocally(int32 lock_cooldown);
110 static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
111 static bool UnsetMetadataSyncedForAll(void);
112 static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
113 													int columnIndex,
114 													Datum value);
115 static char * NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata);
116 static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced);
117 static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
118 											   char *field);
119 static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
120 static void RemoveOldShardPlacementForNodeGroup(int groupId);
121 
122 /* declarations for dynamic loading */
123 PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
124 PG_FUNCTION_INFO_V1(citus_add_node);
125 PG_FUNCTION_INFO_V1(master_add_node);
126 PG_FUNCTION_INFO_V1(citus_add_inactive_node);
127 PG_FUNCTION_INFO_V1(master_add_inactive_node);
128 PG_FUNCTION_INFO_V1(citus_add_secondary_node);
129 PG_FUNCTION_INFO_V1(master_add_secondary_node);
130 PG_FUNCTION_INFO_V1(citus_set_node_property);
131 PG_FUNCTION_INFO_V1(master_set_node_property);
132 PG_FUNCTION_INFO_V1(citus_remove_node);
133 PG_FUNCTION_INFO_V1(master_remove_node);
134 PG_FUNCTION_INFO_V1(citus_disable_node);
135 PG_FUNCTION_INFO_V1(master_disable_node);
136 PG_FUNCTION_INFO_V1(citus_activate_node);
137 PG_FUNCTION_INFO_V1(master_activate_node);
138 PG_FUNCTION_INFO_V1(citus_update_node);
139 PG_FUNCTION_INFO_V1(master_update_node);
140 PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
141 
142 
143 /*
144  * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
145  * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
146  */
147 static NodeMetadata
DefaultNodeMetadata()148 DefaultNodeMetadata()
149 {
150 	NodeMetadata nodeMetadata;
151 
152 	/* ensure uninitialized padding doesn't escape the function */
153 	memset_struct_0(nodeMetadata);
154 	nodeMetadata.nodeRack = WORKER_DEFAULT_RACK;
155 	nodeMetadata.shouldHaveShards = true;
156 	nodeMetadata.groupId = INVALID_GROUP_ID;
157 
158 	return nodeMetadata;
159 }
160 
161 
162 /*
163  * citus_set_coordinator_host configures the hostname and port through which worker
164  * nodes can connect to the coordinator.
165  */
166 Datum
citus_set_coordinator_host(PG_FUNCTION_ARGS)167 citus_set_coordinator_host(PG_FUNCTION_ARGS)
168 {
169 	CheckCitusVersion(ERROR);
170 
171 	text *nodeName = PG_GETARG_TEXT_P(0);
172 	int32 nodePort = PG_GETARG_INT32(1);
173 	char *nodeNameString = text_to_cstring(nodeName);
174 
175 	NodeMetadata nodeMetadata = DefaultNodeMetadata();
176 	nodeMetadata.groupId = 0;
177 	nodeMetadata.shouldHaveShards = false;
178 	nodeMetadata.nodeRole = PG_GETARG_OID(2);
179 
180 	Name nodeClusterName = PG_GETARG_NAME(3);
181 	nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
182 
183 	/* prevent concurrent modification */
184 	LockRelationOid(DistNodeRelationId(), RowShareLock);
185 
186 	bool isCoordinatorInMetadata = false;
187 	WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
188 													  &isCoordinatorInMetadata);
189 	if (!isCoordinatorInMetadata)
190 	{
191 		bool nodeAlreadyExists = false;
192 
193 		/* add the coordinator to pg_dist_node if it was not already added */
194 		AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
195 						&nodeAlreadyExists);
196 
197 		/* we just checked */
198 		Assert(!nodeAlreadyExists);
199 	}
200 	else
201 	{
202 		/*
203 		 * since AddNodeMetadata takes an exclusive lock on pg_dist_node, we
204 		 * do not need to worry about concurrent changes (e.g. deletion) and
205 		 * can proceed to update immediately.
206 		 */
207 
208 		UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort);
209 
210 		/* clear cached plans that have the old host/port */
211 		ResetPlanCache();
212 	}
213 
214 	TransactionModifiedNodeMetadata = true;
215 
216 	PG_RETURN_VOID();
217 }
218 
219 
220 /*
221  * citus_add_node function adds a new node to the cluster and returns its id. It also
222  * replicates all reference tables to the new node.
223  */
224 Datum
citus_add_node(PG_FUNCTION_ARGS)225 citus_add_node(PG_FUNCTION_ARGS)
226 {
227 	CheckCitusVersion(ERROR);
228 
229 	text *nodeName = PG_GETARG_TEXT_P(0);
230 	int32 nodePort = PG_GETARG_INT32(1);
231 	char *nodeNameString = text_to_cstring(nodeName);
232 
233 	NodeMetadata nodeMetadata = DefaultNodeMetadata();
234 	bool nodeAlreadyExists = false;
235 	nodeMetadata.groupId = PG_GETARG_INT32(2);
236 
237 	/*
238 	 * During tests this function is called before nodeRole and nodeCluster have been
239 	 * created.
240 	 */
241 	if (PG_NARGS() == 3)
242 	{
243 		nodeMetadata.nodeRole = InvalidOid;
244 		nodeMetadata.nodeCluster = "default";
245 	}
246 	else
247 	{
248 		Name nodeClusterName = PG_GETARG_NAME(4);
249 		nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
250 
251 		nodeMetadata.nodeRole = PG_GETARG_OID(3);
252 	}
253 
254 	if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
255 	{
256 		/* by default, we add the coordinator without shards */
257 		nodeMetadata.shouldHaveShards = false;
258 	}
259 
260 	int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
261 								 &nodeAlreadyExists);
262 	TransactionModifiedNodeMetadata = true;
263 
264 	/*
265 	 * After adding new node, if the node did not already exist, we will activate
266 	 * the node. This means we will replicate all reference tables to the new
267 	 * node.
268 	 */
269 	if (!nodeAlreadyExists)
270 	{
271 		ActivateNode(nodeNameString, nodePort);
272 	}
273 
274 	PG_RETURN_INT32(nodeId);
275 }
276 
277 
278 /*
279  * master_add_node is a wrapper function for old UDF name.
280  */
281 Datum
master_add_node(PG_FUNCTION_ARGS)282 master_add_node(PG_FUNCTION_ARGS)
283 {
284 	return citus_add_node(fcinfo);
285 }
286 
287 
288 /*
289  * citus_add_inactive_node function adds a new node to the cluster as inactive node
290  * and returns id of the newly added node. It does not replicate reference
291  * tables to the new node, it only adds new node to the pg_dist_node table.
292  */
293 Datum
citus_add_inactive_node(PG_FUNCTION_ARGS)294 citus_add_inactive_node(PG_FUNCTION_ARGS)
295 {
296 	CheckCitusVersion(ERROR);
297 
298 	text *nodeName = PG_GETARG_TEXT_P(0);
299 	int32 nodePort = PG_GETARG_INT32(1);
300 	char *nodeNameString = text_to_cstring(nodeName);
301 	Name nodeClusterName = PG_GETARG_NAME(4);
302 
303 	NodeMetadata nodeMetadata = DefaultNodeMetadata();
304 	bool nodeAlreadyExists = false;
305 	nodeMetadata.groupId = PG_GETARG_INT32(2);
306 	nodeMetadata.nodeRole = PG_GETARG_OID(3);
307 	nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
308 
309 	if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
310 	{
311 		ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node")));
312 	}
313 
314 	int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
315 								 &nodeAlreadyExists);
316 	TransactionModifiedNodeMetadata = true;
317 
318 	PG_RETURN_INT32(nodeId);
319 }
320 
321 
322 /*
323  * master_add_inactive_node is a wrapper function for old UDF name.
324  */
325 Datum
master_add_inactive_node(PG_FUNCTION_ARGS)326 master_add_inactive_node(PG_FUNCTION_ARGS)
327 {
328 	return citus_add_inactive_node(fcinfo);
329 }
330 
331 
332 /*
333  * citus_add_secondary_node adds a new secondary node to the cluster. It accepts as
334  * arguments the primary node it should share a group with.
335  */
336 Datum
citus_add_secondary_node(PG_FUNCTION_ARGS)337 citus_add_secondary_node(PG_FUNCTION_ARGS)
338 {
339 	CheckCitusVersion(ERROR);
340 
341 	text *nodeName = PG_GETARG_TEXT_P(0);
342 	int32 nodePort = PG_GETARG_INT32(1);
343 	char *nodeNameString = text_to_cstring(nodeName);
344 
345 	text *primaryName = PG_GETARG_TEXT_P(2);
346 	int32 primaryPort = PG_GETARG_INT32(3);
347 	char *primaryNameString = text_to_cstring(primaryName);
348 
349 	Name nodeClusterName = PG_GETARG_NAME(4);
350 	NodeMetadata nodeMetadata = DefaultNodeMetadata();
351 	bool nodeAlreadyExists = false;
352 
353 	nodeMetadata.groupId = GroupForNode(primaryNameString, primaryPort);
354 	nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
355 	nodeMetadata.nodeRole = SecondaryNodeRoleId();
356 	nodeMetadata.isActive = true;
357 
358 	int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
359 								 &nodeAlreadyExists);
360 	TransactionModifiedNodeMetadata = true;
361 
362 	PG_RETURN_INT32(nodeId);
363 }
364 
365 
366 /*
367  * master_add_secondary_node is a wrapper function for old UDF name.
368  */
369 Datum
master_add_secondary_node(PG_FUNCTION_ARGS)370 master_add_secondary_node(PG_FUNCTION_ARGS)
371 {
372 	return citus_add_secondary_node(fcinfo);
373 }
374 
375 
376 /*
377  * citus_remove_node function removes the provided node from the pg_dist_node table of
378  * the master node and all nodes with metadata.
379  * The call to the citus_remove_node should be done by the super user and the specified
380  * node should not have any active placements.
381  * This function also deletes all reference table placements belong to the given node from
382  * pg_dist_placement, but it does not drop actual placement at the node. In the case of
383  * re-adding the node, citus_add_node first drops and re-creates the reference tables.
384  */
385 Datum
citus_remove_node(PG_FUNCTION_ARGS)386 citus_remove_node(PG_FUNCTION_ARGS)
387 {
388 	CheckCitusVersion(ERROR);
389 
390 	text *nodeNameText = PG_GETARG_TEXT_P(0);
391 	int32 nodePort = PG_GETARG_INT32(1);
392 
393 	RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort);
394 	TransactionModifiedNodeMetadata = true;
395 
396 	PG_RETURN_VOID();
397 }
398 
399 
400 /*
401  * master_remove_node is a wrapper function for old UDF name.
402  */
403 Datum
master_remove_node(PG_FUNCTION_ARGS)404 master_remove_node(PG_FUNCTION_ARGS)
405 {
406 	return citus_remove_node(fcinfo);
407 }
408 
409 
410 /*
411  * citus_disable_node function sets isactive value of the provided node as inactive at
412  * coordinator and all nodes with metadata regardless of the node having an active shard
413  * placement.
414  *
415  * The call to the citus_disable_node must be done by the super user.
416  *
417  * This function also deletes all reference table placements belong to the given node
418  * from pg_dist_placement, but it does not drop actual placement at the node. In the case
419  * of re-activating the node, citus_add_node first drops and re-creates the reference
420  * tables.
421  */
422 Datum
citus_disable_node(PG_FUNCTION_ARGS)423 citus_disable_node(PG_FUNCTION_ARGS)
424 {
425 	text *nodeNameText = PG_GETARG_TEXT_P(0);
426 	int32 nodePort = PG_GETARG_INT32(1);
427 	char *nodeName = text_to_cstring(nodeNameText);
428 	WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
429 	bool isActive = false;
430 	bool onlyConsiderActivePlacements = false;
431 	MemoryContext savedContext = CurrentMemoryContext;
432 
433 	PG_TRY();
434 	{
435 		if (NodeIsPrimary(workerNode))
436 		{
437 			/*
438 			 * Delete reference table placements so they are not taken into account
439 			 * for the check if there are placements after this.
440 			 */
441 			DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
442 
443 			if (NodeGroupHasShardPlacements(workerNode->groupId,
444 											onlyConsiderActivePlacements))
445 			{
446 				ereport(NOTICE, (errmsg(
447 									 "Node %s:%d has active shard placements. Some queries "
448 									 "may fail after this operation. Use "
449 									 "SELECT citus_activate_node('%s', %d) to activate this "
450 									 "node back.",
451 									 workerNode->workerName, nodePort,
452 									 workerNode->workerName,
453 									 nodePort)));
454 			}
455 		}
456 
457 		SetNodeState(nodeName, nodePort, isActive);
458 		TransactionModifiedNodeMetadata = true;
459 	}
460 	PG_CATCH();
461 	{
462 		/* CopyErrorData() requires (CurrentMemoryContext != ErrorContext) */
463 		MemoryContextSwitchTo(savedContext);
464 		ErrorData *edata = CopyErrorData();
465 
466 		if (ClusterHasKnownMetadataWorkers())
467 		{
468 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
469 							errmsg("Disabling %s:%d failed", workerNode->workerName,
470 								   nodePort),
471 							errdetail("%s", edata->message),
472 							errhint(
473 								"If you are using MX, try stop_metadata_sync_to_node(hostname, port) "
474 								"for nodes that are down before disabling them.")));
475 		}
476 		else
477 		{
478 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
479 							errmsg("Disabling %s:%d failed", workerNode->workerName,
480 								   nodePort),
481 							errdetail("%s", edata->message)));
482 		}
483 	}
484 	PG_END_TRY();
485 
486 	PG_RETURN_VOID();
487 }
488 
489 
490 /*
491  * master_disable_node is a wrapper function for old UDF name.
492  */
493 Datum
master_disable_node(PG_FUNCTION_ARGS)494 master_disable_node(PG_FUNCTION_ARGS)
495 {
496 	return citus_disable_node(fcinfo);
497 }
498 
499 
500 /*
501  * citus_set_node_property sets a property of the node
502  */
503 Datum
citus_set_node_property(PG_FUNCTION_ARGS)504 citus_set_node_property(PG_FUNCTION_ARGS)
505 {
506 	text *nodeNameText = PG_GETARG_TEXT_P(0);
507 	int32 nodePort = PG_GETARG_INT32(1);
508 	text *propertyText = PG_GETARG_TEXT_P(2);
509 	bool value = PG_GETARG_BOOL(3);
510 
511 	WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText),
512 												  nodePort);
513 
514 	if (strcmp(text_to_cstring(propertyText), "shouldhaveshards") == 0)
515 	{
516 		SetShouldHaveShards(workerNode, value);
517 	}
518 	else
519 	{
520 		ereport(ERROR, (errmsg(
521 							"only the 'shouldhaveshards' property can be set using this function"
522 							)));
523 	}
524 
525 	TransactionModifiedNodeMetadata = true;
526 
527 	PG_RETURN_VOID();
528 }
529 
530 
531 /*
532  * master_set_node_property is a wrapper function for old UDF name.
533  */
534 Datum
master_set_node_property(PG_FUNCTION_ARGS)535 master_set_node_property(PG_FUNCTION_ARGS)
536 {
537 	return citus_set_node_property(fcinfo);
538 }
539 
540 
541 /*
542  * SetUpDistributedTableDependencies sets up up the following on a node if it's
543  * a primary node that currently stores data:
544  * - All dependencies (e.g., types, schemas)
545  * - Reference tables, because they are needed to handle queries efficiently.
546  * - Distributed functions
547  *
548  * Note that we do not create the distributed dependencies on the coordinator
549  * since all the dependencies should be present in the coordinator already.
550  */
551 static void
SetUpDistributedTableDependencies(WorkerNode * newWorkerNode)552 SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
553 {
554 	if (NodeIsPrimary(newWorkerNode))
555 	{
556 		EnsureNoModificationsHaveBeenDone();
557 
558 		if (ShouldPropagate() && !NodeIsCoordinator(newWorkerNode))
559 		{
560 			PropagateNodeWideObjects(newWorkerNode);
561 			ReplicateAllDependenciesToNode(newWorkerNode->workerName,
562 										   newWorkerNode->workerPort);
563 		}
564 		else if (!NodeIsCoordinator(newWorkerNode))
565 		{
566 			ereport(WARNING, (errmsg("citus.enable_object_propagation is off, not "
567 									 "creating distributed objects on worker"),
568 							  errdetail("distributed objects are only kept in sync when "
569 										"citus.enable_object_propagation is set to on. "
570 										"Newly activated nodes will not get these "
571 										"objects created")));
572 		}
573 
574 		if (ReplicateReferenceTablesOnActivate)
575 		{
576 			ReplicateAllReferenceTablesToNode(newWorkerNode->workerName,
577 											  newWorkerNode->workerPort);
578 		}
579 
580 		/*
581 		 * Let the maintenance daemon do the hard work of syncing the metadata.
582 		 * We prefer this because otherwise node activation might fail within
583 		 * transaction blocks.
584 		 */
585 		if (ClusterHasDistributedFunctionWithDistArgument())
586 		{
587 			SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata,
588 									 BoolGetDatum(true));
589 			TriggerMetadataSyncOnCommit();
590 		}
591 	}
592 }
593 
594 
595 /*
596  * PropagateNodeWideObjects is called during node activation to propagate any object that
597  * should be propagated for every node. These are generally not linked to any distributed
598  * object but change system wide behaviour.
599  */
600 static void
PropagateNodeWideObjects(WorkerNode * newWorkerNode)601 PropagateNodeWideObjects(WorkerNode *newWorkerNode)
602 {
603 	/* collect all commands */
604 	List *ddlCommands = NIL;
605 
606 	if (EnableAlterRoleSetPropagation)
607 	{
608 		/*
609 		 * Get commands for database and postgres wide settings. Since these settings are not
610 		 * linked to any role that can be distributed we need to distribute them seperately
611 		 */
612 		List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid);
613 		ddlCommands = list_concat(ddlCommands, alterRoleSetCommands);
614 	}
615 
616 	if (list_length(ddlCommands) > 0)
617 	{
618 		/* if there are command wrap them in enable_ddl_propagation off */
619 		ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
620 		ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
621 
622 		/* send commands to new workers*/
623 		SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName,
624 												  newWorkerNode->workerPort,
625 												  CitusExtensionOwnerName(),
626 												  ddlCommands);
627 	}
628 }
629 
630 
631 /*
632  * ModifiableWorkerNode gets the requested WorkerNode and also gets locks
633  * required for modifying it. This fails if the node does not exist.
634  */
635 static WorkerNode *
ModifiableWorkerNode(const char * nodeName,int32 nodePort)636 ModifiableWorkerNode(const char *nodeName, int32 nodePort)
637 {
638 	CheckCitusVersion(ERROR);
639 	EnsureCoordinator();
640 
641 	/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
642 	LockRelationOid(DistNodeRelationId(), ExclusiveLock);
643 
644 	WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
645 	if (workerNode == NULL)
646 	{
647 		ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
648 	}
649 
650 	return workerNode;
651 }
652 
653 
654 /*
655  * citus_activate_node UDF activates the given node. It sets the node's isactive
656  * value to active and replicates all reference tables to that node.
657  */
658 Datum
citus_activate_node(PG_FUNCTION_ARGS)659 citus_activate_node(PG_FUNCTION_ARGS)
660 {
661 	text *nodeNameText = PG_GETARG_TEXT_P(0);
662 	int32 nodePort = PG_GETARG_INT32(1);
663 
664 	WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText),
665 												  nodePort);
666 	ActivateNode(workerNode->workerName, workerNode->workerPort);
667 
668 	TransactionModifiedNodeMetadata = true;
669 
670 	PG_RETURN_INT32(workerNode->nodeId);
671 }
672 
673 
674 /*
675  * master_activate_node is a wrapper function for old UDF name.
676  */
677 Datum
master_activate_node(PG_FUNCTION_ARGS)678 master_activate_node(PG_FUNCTION_ARGS)
679 {
680 	return citus_activate_node(fcinfo);
681 }
682 
683 
684 /*
685  * GroupForNode returns the group which a given node belongs to.
686  *
687  * It only works if the requested node is a part of CurrentCluster.
688  */
689 uint32
GroupForNode(char * nodeName,int nodePort)690 GroupForNode(char *nodeName, int nodePort)
691 {
692 	WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
693 
694 	if (workerNode == NULL)
695 	{
696 		ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
697 	}
698 
699 	return workerNode->groupId;
700 }
701 
702 
703 /*
704  * NodeIsPrimaryAndLocal returns whether the argument represents the local
705  * primary node.
706  */
707 bool
NodeIsPrimaryAndRemote(WorkerNode * worker)708 NodeIsPrimaryAndRemote(WorkerNode *worker)
709 {
710 	return NodeIsPrimary(worker) && !NodeIsLocal(worker);
711 }
712 
713 
714 /*
715  * NodeIsPrimary returns whether the argument represents a primary node.
716  */
717 bool
NodeIsPrimary(WorkerNode * worker)718 NodeIsPrimary(WorkerNode *worker)
719 {
720 	Oid primaryRole = PrimaryNodeRoleId();
721 
722 	/* if nodeRole does not yet exist, all nodes are primary nodes */
723 	if (primaryRole == InvalidOid)
724 	{
725 		return true;
726 	}
727 
728 	return worker->nodeRole == primaryRole;
729 }
730 
731 
732 /*
733  * NodeIsLocal returns whether the argument represents the local node.
734  */
735 static bool
NodeIsLocal(WorkerNode * worker)736 NodeIsLocal(WorkerNode *worker)
737 {
738 	return worker->groupId == GetLocalGroupId();
739 }
740 
741 
742 /*
743  * NodeIsSecondary returns whether the argument represents a secondary node.
744  */
745 bool
NodeIsSecondary(WorkerNode * worker)746 NodeIsSecondary(WorkerNode *worker)
747 {
748 	Oid secondaryRole = SecondaryNodeRoleId();
749 
750 	/* if nodeRole does not yet exist, all nodes are primary nodes */
751 	if (secondaryRole == InvalidOid)
752 	{
753 		return false;
754 	}
755 
756 	return worker->nodeRole == secondaryRole;
757 }
758 
759 
760 /*
761  * NodeIsReadable returns whether we're allowed to send SELECT queries to this
762  * node.
763  */
764 bool
NodeIsReadable(WorkerNode * workerNode)765 NodeIsReadable(WorkerNode *workerNode)
766 {
767 	if (ReadFromSecondaries == USE_SECONDARY_NODES_NEVER &&
768 		NodeIsPrimary(workerNode))
769 	{
770 		return true;
771 	}
772 
773 	if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS &&
774 		NodeIsSecondary(workerNode))
775 	{
776 		return true;
777 	}
778 
779 	return false;
780 }
781 
782 
783 /*
784  * PrimaryNodeForGroup returns the (unique) primary in the specified group.
785  *
786  * If there are any nodes in the requested group and groupContainsNodes is not NULL
787  * it will set the bool groupContainsNodes references to true.
788  */
789 WorkerNode *
PrimaryNodeForGroup(int32 groupId,bool * groupContainsNodes)790 PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
791 {
792 	WorkerNode *workerNode = NULL;
793 	HASH_SEQ_STATUS status;
794 	HTAB *workerNodeHash = GetWorkerNodeHash();
795 
796 	hash_seq_init(&status, workerNodeHash);
797 
798 	while ((workerNode = hash_seq_search(&status)) != NULL)
799 	{
800 		int32 workerNodeGroupId = workerNode->groupId;
801 		if (workerNodeGroupId != groupId)
802 		{
803 			continue;
804 		}
805 
806 		if (groupContainsNodes != NULL)
807 		{
808 			*groupContainsNodes = true;
809 		}
810 
811 		if (NodeIsPrimary(workerNode))
812 		{
813 			hash_seq_term(&status);
814 			return workerNode;
815 		}
816 	}
817 
818 	return NULL;
819 }
820 
821 
822 /*
823  * ActivateNode activates the node with nodeName and nodePort. Currently, activation
824  * includes only replicating the reference tables and setting isactive column of the
825  * given node.
826  */
827 static int
ActivateNode(char * nodeName,int nodePort)828 ActivateNode(char *nodeName, int nodePort)
829 {
830 	bool isActive = true;
831 
832 	/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
833 	LockRelationOid(DistNodeRelationId(), ExclusiveLock);
834 
835 	WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
836 
837 	SetUpDistributedTableDependencies(newWorkerNode);
838 	return newWorkerNode->nodeId;
839 }
840 
841 
842 /*
843  * citus_update_node moves the requested node to a different nodename and nodeport. It
844  * locks to ensure no queries are running concurrently; and is intended for customers who
845  * are running their own failover solution.
846  */
847 Datum
citus_update_node(PG_FUNCTION_ARGS)848 citus_update_node(PG_FUNCTION_ARGS)
849 {
850 	CheckCitusVersion(ERROR);
851 
852 	int32 nodeId = PG_GETARG_INT32(0);
853 
854 	text *newNodeName = PG_GETARG_TEXT_P(1);
855 	int32 newNodePort = PG_GETARG_INT32(2);
856 
857 	/*
858 	 * force is used when an update needs to happen regardless of conflicting locks. This
859 	 * feature is important to force the update during a failover due to failure, eg. by
860 	 * a high-availability system such as pg_auto_failover. The strategy is to start a
861 	 * background worker that actively cancels backends holding conflicting locks with
862 	 * this backend.
863 	 *
864 	 * Defaults to false
865 	 */
866 	bool force = PG_GETARG_BOOL(3);
867 	int32 lock_cooldown = PG_GETARG_INT32(4);
868 
869 	char *newNodeNameString = text_to_cstring(newNodeName);
870 	List *placementList = NIL;
871 	BackgroundWorkerHandle *handle = NULL;
872 
873 	WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
874 																	 newNodePort);
875 	if (workerNodeWithSameAddress != NULL)
876 	{
877 		/* a node with the given hostname and port already exists in the metadata */
878 
879 		if (workerNodeWithSameAddress->nodeId == nodeId)
880 		{
881 			/* it's the node itself, meaning this is a noop update */
882 			PG_RETURN_VOID();
883 		}
884 		else
885 		{
886 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
887 							errmsg("there is already another node with the specified "
888 								   "hostname and port")));
889 		}
890 	}
891 
892 	WorkerNode *workerNode = LookupNodeByNodeId(nodeId);
893 	if (workerNode == NULL)
894 	{
895 		ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
896 						errmsg("node %u not found", nodeId)));
897 	}
898 
899 
900 	/*
901 	 * If the node is a primary node we block reads and writes.
902 	 *
903 	 * This lock has two purposes:
904 	 *
905 	 * - Ensure buggy code in Citus doesn't cause failures when the
906 	 *   nodename/nodeport of a node changes mid-query
907 	 *
908 	 * - Provide fencing during failover, after this function returns all
909 	 *   connections will use the new node location.
910 	 *
911 	 * Drawback:
912 	 *
913 	 * - This function blocks until all previous queries have finished. This
914 	 *   means that long-running queries will prevent failover.
915 	 *
916 	 *   In case of node failure said long-running queries will fail in the end
917 	 *   anyway as they will be unable to commit successfully on the failed
918 	 *   machine. To cause quick failure of these queries use force => true
919 	 *   during the invocation of citus_update_node to terminate conflicting
920 	 *   backends proactively.
921 	 *
922 	 * It might be worth blocking reads to a secondary for the same reasons,
923 	 * though we currently only query secondaries on follower clusters
924 	 * where these locks will have no effect.
925 	 */
926 	if (NodeIsPrimary(workerNode))
927 	{
928 		/*
929 		 * before acquiring the locks check if we want a background worker to help us to
930 		 * aggressively obtain the locks.
931 		 */
932 		if (force)
933 		{
934 			handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
935 			if (!handle)
936 			{
937 				/*
938 				 * We failed to start a background worker, which probably means that we exceeded
939 				 * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
940 				 * to repeatedly throw an error because if citus_update_node is called to complete a
941 				 * failover then finishing is the only way to bring the cluster back up. Therefore we
942 				 * give up on killing other backends and simply wait for the lock. We do set
943 				 * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
944 				 */
945 				SetLockTimeoutLocally(lock_cooldown);
946 				ereport(WARNING, (errmsg(
947 									  "could not start background worker to kill backends with conflicting"
948 									  " locks to force the update. Degrading to acquiring locks "
949 									  "with a lock time out."),
950 								  errhint(
951 									  "Increasing max_worker_processes might help.")));
952 			}
953 		}
954 
955 		placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
956 		LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
957 	}
958 
959 	/*
960 	 * if we have planned statements such as prepared statements, we should clear the cache so that
961 	 * the planned cache doesn't return the old nodename/nodepost.
962 	 */
963 	ResetPlanCache();
964 
965 	UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
966 
967 	/* we should be able to find the new node from the metadata */
968 	workerNode = FindWorkerNode(newNodeNameString, newNodePort);
969 	Assert(workerNode->nodeId == nodeId);
970 
971 	/*
972 	 * Propagate the updated pg_dist_node entry to all metadata workers.
973 	 * citus-ha uses citus_update_node() in a prepared transaction, and
974 	 * we don't support coordinated prepared transactions, so we cannot
975 	 * propagate the changes to the worker nodes here. Instead we mark
976 	 * all metadata nodes as not-synced and ask maintenanced to do the
977 	 * propagation.
978 	 *
979 	 * It is possible that maintenance daemon does the first resync too
980 	 * early, but that's fine, since this will start a retry loop with
981 	 * 5 second intervals until sync is complete.
982 	 */
983 	if (UnsetMetadataSyncedForAll())
984 	{
985 		TriggerMetadataSyncOnCommit();
986 	}
987 
988 	if (handle != NULL)
989 	{
990 		/*
991 		 * this will be called on memory context cleanup as well, if the worker has been
992 		 * terminated already this will be a noop
993 		 */
994 		TerminateBackgroundWorker(handle);
995 	}
996 
997 	TransactionModifiedNodeMetadata = true;
998 
999 	PG_RETURN_VOID();
1000 }
1001 
1002 
1003 /*
1004  * master_update_node is a wrapper function for old UDF name.
1005  */
1006 Datum
master_update_node(PG_FUNCTION_ARGS)1007 master_update_node(PG_FUNCTION_ARGS)
1008 {
1009 	return citus_update_node(fcinfo);
1010 }
1011 
1012 
1013 /*
1014  * SetLockTimeoutLocally sets the lock_timeout to the given value.
1015  * This setting is local.
1016  */
1017 static void
SetLockTimeoutLocally(int32 lockCooldown)1018 SetLockTimeoutLocally(int32 lockCooldown)
1019 {
1020 	set_config_option("lock_timeout", ConvertIntToString(lockCooldown),
1021 					  (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
1022 					  GUC_ACTION_LOCAL, true, 0, false);
1023 }
1024 
1025 
1026 static void
UpdateNodeLocation(int32 nodeId,char * newNodeName,int32 newNodePort)1027 UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
1028 {
1029 	const bool indexOK = true;
1030 
1031 	ScanKeyData scanKey[1];
1032 	Datum values[Natts_pg_dist_node];
1033 	bool isnull[Natts_pg_dist_node];
1034 	bool replace[Natts_pg_dist_node];
1035 
1036 	Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
1037 	TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1038 
1039 	ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
1040 				BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
1041 
1042 	SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, DistNodeNodeIdIndexId(),
1043 													indexOK,
1044 													NULL, 1, scanKey);
1045 
1046 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
1047 	if (!HeapTupleIsValid(heapTuple))
1048 	{
1049 		ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
1050 							   newNodeName, newNodePort)));
1051 	}
1052 
1053 	memset(replace, 0, sizeof(replace));
1054 
1055 	values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
1056 	isnull[Anum_pg_dist_node_nodeport - 1] = false;
1057 	replace[Anum_pg_dist_node_nodeport - 1] = true;
1058 
1059 	values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(newNodeName);
1060 	isnull[Anum_pg_dist_node_nodename - 1] = false;
1061 	replace[Anum_pg_dist_node_nodename - 1] = true;
1062 
1063 	heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
1064 
1065 	CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);
1066 
1067 	CitusInvalidateRelcacheByRelid(DistNodeRelationId());
1068 
1069 	CommandCounterIncrement();
1070 
1071 	systable_endscan(scanDescriptor);
1072 	table_close(pgDistNode, NoLock);
1073 }
1074 
1075 
1076 /*
1077  * get_shard_id_for_distribution_column function takes a distributed table name and a
1078  * distribution value then returns shard id of the shard which belongs to given table and
1079  * contains given value. This function only works for hash distributed tables.
1080  */
1081 Datum
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)1082 get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
1083 {
1084 	CheckCitusVersion(ERROR);
1085 
1086 	ShardInterval *shardInterval = NULL;
1087 
1088 	/*
1089 	 * To have optional parameter as NULL, we defined this UDF as not strict, therefore
1090 	 * we need to check all parameters for NULL values.
1091 	 */
1092 	if (PG_ARGISNULL(0))
1093 	{
1094 		ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1095 						errmsg("relation cannot be NULL")));
1096 	}
1097 
1098 	Oid relationId = PG_GETARG_OID(0);
1099 	EnsureTablePermissions(relationId, ACL_SELECT);
1100 
1101 	if (!IsCitusTable(relationId))
1102 	{
1103 		ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1104 						errmsg("relation is not distributed")));
1105 	}
1106 
1107 	if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
1108 	{
1109 		List *shardIntervalList = LoadShardIntervalList(relationId);
1110 		if (shardIntervalList == NIL)
1111 		{
1112 			PG_RETURN_INT64(0);
1113 		}
1114 
1115 		shardInterval = (ShardInterval *) linitial(shardIntervalList);
1116 	}
1117 	else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) ||
1118 			 IsCitusTableType(relationId, RANGE_DISTRIBUTED))
1119 	{
1120 		CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
1121 
1122 		/* if given table is not reference table, distributionValue cannot be NULL */
1123 		if (PG_ARGISNULL(1))
1124 		{
1125 			ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1126 							errmsg("distribution value cannot be NULL for tables other "
1127 								   "than reference tables.")));
1128 		}
1129 
1130 		Datum inputDatum = PG_GETARG_DATUM(1);
1131 		Oid inputDataType = get_fn_expr_argtype(fcinfo->flinfo, 1);
1132 		char *distributionValueString = DatumToString(inputDatum, inputDataType);
1133 
1134 		Var *distributionColumn = DistPartitionKeyOrError(relationId);
1135 		Oid distributionDataType = distributionColumn->vartype;
1136 
1137 		Datum distributionValueDatum = StringToDatum(distributionValueString,
1138 													 distributionDataType);
1139 
1140 		shardInterval = FindShardInterval(distributionValueDatum, cacheEntry);
1141 	}
1142 	else
1143 	{
1144 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1145 						errmsg("finding shard id of given distribution value is only "
1146 							   "supported for hash partitioned tables, range partitioned "
1147 							   "tables and reference tables.")));
1148 	}
1149 
1150 	if (shardInterval != NULL)
1151 	{
1152 		PG_RETURN_INT64(shardInterval->shardId);
1153 	}
1154 
1155 	PG_RETURN_INT64(0);
1156 }
1157 
1158 
1159 /*
1160  * FindWorkerNode searches over the worker nodes and returns the workerNode
1161  * if it already exists. Else, the function returns NULL.
1162  */
1163 WorkerNode *
FindWorkerNode(const char * nodeName,int32 nodePort)1164 FindWorkerNode(const char *nodeName, int32 nodePort)
1165 {
1166 	HTAB *workerNodeHash = GetWorkerNodeHash();
1167 	bool handleFound = false;
1168 
1169 	WorkerNode *searchedNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
1170 	strlcpy(searchedNode->workerName, nodeName, WORKER_LENGTH);
1171 	searchedNode->workerPort = nodePort;
1172 
1173 	void *hashKey = (void *) searchedNode;
1174 	WorkerNode *cachedWorkerNode = (WorkerNode *) hash_search(workerNodeHash, hashKey,
1175 															  HASH_FIND,
1176 															  &handleFound);
1177 	if (handleFound)
1178 	{
1179 		WorkerNode *workerNode = (WorkerNode *) palloc(sizeof(WorkerNode));
1180 		*workerNode = *cachedWorkerNode;
1181 		return workerNode;
1182 	}
1183 
1184 	return NULL;
1185 }
1186 
1187 
1188 /*
1189  * FindWorkerNode searches over the worker nodes and returns the workerNode
1190  * if it exists otherwise it errors out.
1191  */
1192 WorkerNode *
FindWorkerNodeOrError(const char * nodeName,int32 nodePort)1193 FindWorkerNodeOrError(const char *nodeName, int32 nodePort)
1194 {
1195 	WorkerNode *node = FindWorkerNode(nodeName, nodePort);
1196 	if (node == NULL)
1197 	{
1198 		ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
1199 						errmsg("node %s:%d not found", nodeName, nodePort)));
1200 	}
1201 	return node;
1202 }
1203 
1204 
1205 /*
1206  * FindWorkerNodeAnyCluster returns the workerNode no matter which cluster it is a part
1207  * of. FindWorkerNodes, like almost every other function, acts as if nodes in other
1208  * clusters do not exist.
1209  */
1210 WorkerNode *
FindWorkerNodeAnyCluster(const char * nodeName,int32 nodePort)1211 FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
1212 {
1213 	WorkerNode *workerNode = NULL;
1214 
1215 	Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
1216 	TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1217 
1218 	HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
1219 	if (heapTuple != NULL)
1220 	{
1221 		workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
1222 	}
1223 
1224 	table_close(pgDistNode, NoLock);
1225 	return workerNode;
1226 }
1227 
1228 
1229 /*
1230  * ReadDistNode iterates over pg_dist_node table, converts each row
1231  * into it's memory representation (i.e., WorkerNode) and adds them into
1232  * a list. Lastly, the list is returned to the caller.
1233  *
1234  * It skips nodes which are not in the current clusters unless requested to do otherwise
1235  * by includeNodesFromOtherClusters.
1236  */
1237 List *
ReadDistNode(bool includeNodesFromOtherClusters)1238 ReadDistNode(bool includeNodesFromOtherClusters)
1239 {
1240 	ScanKeyData scanKey[1];
1241 	int scanKeyCount = 0;
1242 	List *workerNodeList = NIL;
1243 
1244 	Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
1245 
1246 	SysScanDesc scanDescriptor = systable_beginscan(pgDistNode,
1247 													InvalidOid, false,
1248 													NULL, scanKeyCount, scanKey);
1249 
1250 	TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1251 
1252 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
1253 	while (HeapTupleIsValid(heapTuple))
1254 	{
1255 		WorkerNode *workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
1256 
1257 		if (includeNodesFromOtherClusters ||
1258 			strncmp(workerNode->nodeCluster, CurrentCluster, WORKER_LENGTH) == 0)
1259 		{
1260 			/* the coordinator acts as if it never sees nodes not in it's cluster */
1261 			workerNodeList = lappend(workerNodeList, workerNode);
1262 		}
1263 
1264 		heapTuple = systable_getnext(scanDescriptor);
1265 	}
1266 
1267 	systable_endscan(scanDescriptor);
1268 	table_close(pgDistNode, NoLock);
1269 
1270 	return workerNodeList;
1271 }
1272 
1273 
1274 /*
1275  * RemoveNodeFromCluster removes the provided node from the pg_dist_node table of
1276  * the master node and all nodes with metadata.
1277  * The call to the master_remove_node should be done by the super user. If there are
1278  * active shard placements on the node; the function errors out.
1279  * This function also deletes all reference table placements belong to the given node from
1280  * pg_dist_placement, but it does not drop actual placement at the node. It also
1281  * modifies replication factor of the colocation group of reference tables, so that
1282  * replication factor will be equal to worker count.
1283  */
1284 static void
RemoveNodeFromCluster(char * nodeName,int32 nodePort)1285 RemoveNodeFromCluster(char *nodeName, int32 nodePort)
1286 {
1287 	WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
1288 	if (NodeIsPrimary(workerNode))
1289 	{
1290 		if (CanRemoveReferenceTablePlacements())
1291 		{
1292 			/*
1293 			 * Delete reference table placements so they are not taken into account
1294 			 * for the check if there are placements after this.
1295 			 */
1296 			DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
1297 		}
1298 		if (NodeGroupHasLivePlacements(workerNode->groupId))
1299 		{
1300 			if (ActivePrimaryNodeCount() == 1 && ClusterHasReferenceTable())
1301 			{
1302 				ereport(ERROR, (errmsg(
1303 									"cannot remove the last worker node because there are reference "
1304 									"tables and it would cause data loss on reference tables"),
1305 								errhint(
1306 									"To proceed, either drop the reference tables or use "
1307 									"undistribute_table() function to convert them to local tables")));
1308 			}
1309 			ereport(ERROR, (errmsg("cannot remove the primary node of a node group "
1310 								   "which has shard placements"),
1311 							errhint(
1312 								"To proceed, either drop the distributed tables or use "
1313 								"undistribute_table() function to convert them to local tables")));
1314 		}
1315 
1316 		/*
1317 		 * Secondary nodes are read-only, never 2PC is used.
1318 		 * Hence, no items can be inserted to pg_dist_transaction for secondary nodes.
1319 		 */
1320 		DeleteWorkerTransactions(workerNode);
1321 	}
1322 
1323 	DeleteNodeRow(workerNode->workerName, nodePort);
1324 
1325 	RemoveOldShardPlacementForNodeGroup(workerNode->groupId);
1326 
1327 	char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
1328 
1329 	/* make sure we don't have any lingering session lifespan connections */
1330 	CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
1331 
1332 	SendCommandToWorkersWithMetadata(nodeDeleteCommand);
1333 }
1334 
1335 
1336 /*
1337  * RemoveOldShardPlacementForNodeGroup removes all old shard placements
1338  * for the given node group from pg_dist_placement.
1339  */
1340 static void
RemoveOldShardPlacementForNodeGroup(int groupId)1341 RemoveOldShardPlacementForNodeGroup(int groupId)
1342 {
1343 	/*
1344 	 * Prevent concurrent deferred drop
1345 	 */
1346 	LockPlacementCleanup();
1347 	List *shardPlacementsOnNode = AllShardPlacementsOnNodeGroup(groupId);
1348 	GroupShardPlacement *placement = NULL;
1349 	foreach_ptr(placement, shardPlacementsOnNode)
1350 	{
1351 		if (placement->shardState == SHARD_STATE_TO_DELETE)
1352 		{
1353 			DeleteShardPlacementRow(placement->placementId);
1354 		}
1355 	}
1356 }
1357 
1358 
1359 /*
1360  * CanRemoveReferenceTablePlacements returns true if active primary
1361  * node count is more than 1, which means that even if we remove a node
1362  * we will still have some other node that has reference table placement.
1363  */
1364 static bool
CanRemoveReferenceTablePlacements(void)1365 CanRemoveReferenceTablePlacements(void)
1366 {
1367 	return ActivePrimaryNodeCount() > 1;
1368 }
1369 
1370 
1371 /* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */
1372 uint32
CountPrimariesWithMetadata(void)1373 CountPrimariesWithMetadata(void)
1374 {
1375 	uint32 primariesWithMetadata = 0;
1376 	WorkerNode *workerNode = NULL;
1377 
1378 	HASH_SEQ_STATUS status;
1379 	HTAB *workerNodeHash = GetWorkerNodeHash();
1380 
1381 	hash_seq_init(&status, workerNodeHash);
1382 
1383 	while ((workerNode = hash_seq_search(&status)) != NULL)
1384 	{
1385 		if (workerNode->hasMetadata && NodeIsPrimary(workerNode))
1386 		{
1387 			primariesWithMetadata++;
1388 		}
1389 	}
1390 
1391 	return primariesWithMetadata;
1392 }
1393 
1394 
1395 /*
1396  * AddNodeMetadata checks the given node information and adds the specified node to the
1397  * pg_dist_node table of the master and workers with metadata.
1398  * If the node already exists, the function returns the id of the node.
1399  * If not, the following procedure is followed while adding a node: If the groupId is not
1400  * explicitly given by the user, the function picks the group that the new node should
1401  * be in with respect to GroupSize. Then, the new node is inserted into the local
1402  * pg_dist_node as well as the nodes with hasmetadata=true.
1403  */
1404 static int
AddNodeMetadata(char * nodeName,int32 nodePort,NodeMetadata * nodeMetadata,bool * nodeAlreadyExists)1405 AddNodeMetadata(char *nodeName, int32 nodePort,
1406 				NodeMetadata *nodeMetadata,
1407 				bool *nodeAlreadyExists)
1408 {
1409 	EnsureCoordinator();
1410 
1411 	*nodeAlreadyExists = false;
1412 
1413 	/*
1414 	 * Prevent / wait for concurrent modification before checking whether
1415 	 * the worker already exists in pg_dist_node.
1416 	 */
1417 	LockRelationOid(DistNodeRelationId(), RowShareLock);
1418 
1419 	WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1420 	if (workerNode != NULL)
1421 	{
1422 		/* return early without holding locks when the node already exists */
1423 		*nodeAlreadyExists = true;
1424 
1425 		return workerNode->nodeId;
1426 	}
1427 
1428 	/*
1429 	 * We are going to change pg_dist_node, prevent any concurrent reads that
1430 	 * are not tolerant to concurrent node addition by taking an exclusive
1431 	 * lock (conflicts with all but AccessShareLock).
1432 	 *
1433 	 * We may want to relax or have more fine-grained locking in the future
1434 	 * to allow users to add multiple nodes concurrently.
1435 	 */
1436 	LockRelationOid(DistNodeRelationId(), ExclusiveLock);
1437 
1438 	/* recheck in case 2 node additions pass the first check concurrently */
1439 	workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1440 	if (workerNode != NULL)
1441 	{
1442 		*nodeAlreadyExists = true;
1443 
1444 		return workerNode->nodeId;
1445 	}
1446 
1447 	if (nodeMetadata->groupId != COORDINATOR_GROUP_ID &&
1448 		strcmp(nodeName, "localhost") != 0)
1449 	{
1450 		/*
1451 		 * User tries to add a worker with a non-localhost address. If the coordinator
1452 		 * is added with "localhost" as well, the worker won't be able to connect.
1453 		 */
1454 
1455 		bool isCoordinatorInMetadata = false;
1456 		WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
1457 														  &isCoordinatorInMetadata);
1458 		if (isCoordinatorInMetadata &&
1459 			strcmp(coordinatorNode->workerName, "localhost") == 0)
1460 		{
1461 			ereport(ERROR, (errmsg("cannot add a worker node when the coordinator "
1462 								   "hostname is set to localhost"),
1463 							errdetail("Worker nodes need to be able to connect to the "
1464 									  "coordinator to transfer data."),
1465 							errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
1466 									"to configure the coordinator hostname")));
1467 		}
1468 	}
1469 
1470 	/*
1471 	 * When adding the first worker when the coordinator has shard placements,
1472 	 * print a notice on how to drain the coordinator.
1473 	 */
1474 	if (nodeMetadata->groupId != COORDINATOR_GROUP_ID && CoordinatorAddedAsWorkerNode() &&
1475 		ActivePrimaryNonCoordinatorNodeCount() == 0 &&
1476 		NodeGroupHasShardPlacements(COORDINATOR_GROUP_ID, true))
1477 	{
1478 		WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();
1479 
1480 		ereport(NOTICE, (errmsg("shards are still on the coordinator after adding the "
1481 								"new node"),
1482 						 errhint("Use SELECT rebalance_table_shards(); to balance "
1483 								 "shards data between workers and coordinator or "
1484 								 "SELECT citus_drain_node(%s,%d); to permanently "
1485 								 "move shards away from the coordinator.",
1486 								 quote_literal_cstr(coordinator->workerName),
1487 								 coordinator->workerPort)));
1488 	}
1489 
1490 	/* user lets Citus to decide on the group that the newly added node should be in */
1491 	if (nodeMetadata->groupId == INVALID_GROUP_ID)
1492 	{
1493 		nodeMetadata->groupId = GetNextGroupId();
1494 	}
1495 
1496 	if (nodeMetadata->groupId == COORDINATOR_GROUP_ID)
1497 	{
1498 		/*
1499 		 * Coordinator has always the authoritative metadata, reflect this
1500 		 * fact in the pg_dist_node.
1501 		 */
1502 		nodeMetadata->hasMetadata = true;
1503 		nodeMetadata->metadataSynced = true;
1504 
1505 		/*
1506 		 * There is no concept of "inactive" coordinator, so hard code it.
1507 		 */
1508 		nodeMetadata->isActive = true;
1509 	}
1510 
1511 	/* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
1512 	if (nodeMetadata->nodeRole != InvalidOid && nodeMetadata->nodeRole ==
1513 		PrimaryNodeRoleId())
1514 	{
1515 		WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(nodeMetadata->groupId,
1516 															  NULL);
1517 
1518 		if (existingPrimaryNode != NULL)
1519 		{
1520 			ereport(ERROR, (errmsg("group %d already has a primary node",
1521 								   nodeMetadata->groupId)));
1522 		}
1523 	}
1524 
1525 	if (nodeMetadata->nodeRole == PrimaryNodeRoleId())
1526 	{
1527 		if (strncmp(nodeMetadata->nodeCluster,
1528 					WORKER_DEFAULT_CLUSTER,
1529 					WORKER_LENGTH) != 0)
1530 		{
1531 			ereport(ERROR, (errmsg("primaries must be added to the default cluster")));
1532 		}
1533 	}
1534 
1535 	/* generate the new node id from the sequence */
1536 	int nextNodeIdInt = GetNextNodeId();
1537 
1538 	InsertNodeRow(nextNodeIdInt, nodeName, nodePort, nodeMetadata);
1539 
1540 	workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1541 
1542 	/* send the delete command to all primary nodes with metadata */
1543 	char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
1544 	SendCommandToWorkersWithMetadata(nodeDeleteCommand);
1545 
1546 	/* finally prepare the insert command and send it to all primary nodes */
1547 	uint32 primariesWithMetadata = CountPrimariesWithMetadata();
1548 	if (primariesWithMetadata != 0)
1549 	{
1550 		List *workerNodeList = list_make1(workerNode);
1551 		char *nodeInsertCommand = NodeListInsertCommand(workerNodeList);
1552 
1553 		SendCommandToWorkersWithMetadata(nodeInsertCommand);
1554 	}
1555 
1556 	return workerNode->nodeId;
1557 }
1558 
1559 
1560 /*
1561  * SetWorkerColumn function sets the column with the specified index
1562  * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
1563  * It also sends the same command for node update to other metadata nodes.
1564  * If anything fails during the transaction, we rollback it.
1565  * Returns the new worker node after the modification.
1566  */
1567 WorkerNode *
SetWorkerColumn(WorkerNode * workerNode,int columnIndex,Datum value)1568 SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
1569 {
1570 	workerNode = SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
1571 
1572 	char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode,
1573 																	  columnIndex,
1574 																	  value);
1575 
1576 	SendCommandToWorkersWithMetadata(metadataSyncCommand);
1577 
1578 	return workerNode;
1579 }
1580 
1581 
1582 /*
1583  * SetWorkerColumnOptional function sets the column with the specified index
1584  * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
1585  * It also sends the same command optionally for node update to other metadata nodes,
1586  * meaning that failures are ignored. Returns the new worker node after the modification.
1587  */
1588 WorkerNode *
SetWorkerColumnOptional(WorkerNode * workerNode,int columnIndex,Datum value)1589 SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value)
1590 {
1591 	char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode,
1592 																	  columnIndex,
1593 																	  value);
1594 
1595 	List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
1596 												   ShareLock);
1597 
1598 	/* open connections in parallel */
1599 	WorkerNode *worker = NULL;
1600 	foreach_ptr(worker, workerNodeList)
1601 	{
1602 		bool success = SendOptionalCommandListToWorkerInCoordinatedTransaction(
1603 			worker->workerName, worker->workerPort,
1604 			CurrentUserName(),
1605 			list_make1(metadataSyncCommand));
1606 
1607 		if (!success)
1608 		{
1609 			/* metadata out of sync, mark the worker as not synced */
1610 			ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d "
1611 									 "is failed on node %s:%d."
1612 									 "Metadata on %s:%d is marked as out of sync.",
1613 									 workerNode->workerName, workerNode->workerPort,
1614 									 worker->workerName, worker->workerPort,
1615 									 worker->workerName, worker->workerPort)));
1616 
1617 			SetWorkerColumnLocalOnly(worker, Anum_pg_dist_node_metadatasynced,
1618 									 BoolGetDatum(false));
1619 		}
1620 		else if (workerNode->nodeId == worker->nodeId)
1621 		{
1622 			/*
1623 			 * If this is the node we want to update and it is updated succesfully,
1624 			 * then we can safely update the flag on the coordinator as well.
1625 			 */
1626 			SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
1627 		}
1628 	}
1629 
1630 	return FindWorkerNode(workerNode->workerName, workerNode->workerPort);
1631 }
1632 
1633 
1634 /*
1635  * SetWorkerColumnLocalOnly function sets the column with the specified index
1636  * (see pg_dist_node.h) on the worker in pg_dist_node.
1637  * It returns the new worker node after the modification.
1638  */
1639 WorkerNode *
SetWorkerColumnLocalOnly(WorkerNode * workerNode,int columnIndex,Datum value)1640 SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
1641 {
1642 	Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
1643 	TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
1644 	HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
1645 
1646 	Datum values[Natts_pg_dist_node];
1647 	bool isnull[Natts_pg_dist_node];
1648 	bool replace[Natts_pg_dist_node];
1649 
1650 	if (heapTuple == NULL)
1651 	{
1652 		ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
1653 							   workerNode->workerName, workerNode->workerPort)));
1654 	}
1655 
1656 	memset(replace, 0, sizeof(replace));
1657 	values[columnIndex - 1] = value;
1658 	isnull[columnIndex - 1] = false;
1659 	replace[columnIndex - 1] = true;
1660 
1661 	heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
1662 
1663 	CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);
1664 
1665 	CitusInvalidateRelcacheByRelid(DistNodeRelationId());
1666 	CommandCounterIncrement();
1667 
1668 	WorkerNode *newWorkerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
1669 
1670 	table_close(pgDistNode, NoLock);
1671 
1672 	return newWorkerNode;
1673 }
1674 
1675 
1676 /*
1677  * GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is
1678  * valid or not. Then it returns the necessary metadata sync command as a string.
1679  */
1680 static char *
GetMetadataSyncCommandToSetNodeColumn(WorkerNode * workerNode,int columnIndex,Datum value)1681 GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum
1682 									  value)
1683 {
1684 	char *metadataSyncCommand = NULL;
1685 
1686 	switch (columnIndex)
1687 	{
1688 		case Anum_pg_dist_node_hasmetadata:
1689 		{
1690 			ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata");
1691 			metadataSyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId,
1692 															   DatumGetBool(value));
1693 			break;
1694 		}
1695 
1696 		case Anum_pg_dist_node_isactive:
1697 		{
1698 			ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive");
1699 
1700 			metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId,
1701 														 DatumGetBool(value));
1702 			break;
1703 		}
1704 
1705 		case Anum_pg_dist_node_shouldhaveshards:
1706 		{
1707 			metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId,
1708 																DatumGetBool(value));
1709 			break;
1710 		}
1711 
1712 		case Anum_pg_dist_node_metadatasynced:
1713 		{
1714 			ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced");
1715 			metadataSyncCommand = NodeMetadataSyncedUpdateCommand(workerNode->nodeId,
1716 																  DatumGetBool(value));
1717 			break;
1718 		}
1719 
1720 		default:
1721 		{
1722 			ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
1723 								   workerNode->workerName, workerNode->workerPort)));
1724 		}
1725 	}
1726 
1727 	return metadataSyncCommand;
1728 }
1729 
1730 
1731 /*
1732  * NodeHasmetadataUpdateCommand generates and returns a SQL UPDATE command
1733  * that updates the hasmetada column of pg_dist_node, for the given nodeid.
1734  */
1735 static char *
NodeHasmetadataUpdateCommand(uint32 nodeId,bool hasMetadata)1736 NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata)
1737 {
1738 	StringInfo updateCommand = makeStringInfo();
1739 	char *hasMetadataString = hasMetadata ? "TRUE" : "FALSE";
1740 	appendStringInfo(updateCommand,
1741 					 "UPDATE pg_dist_node SET hasmetadata = %s "
1742 					 "WHERE nodeid = %u",
1743 					 hasMetadataString, nodeId);
1744 	return updateCommand->data;
1745 }
1746 
1747 
1748 /*
1749  * NodeMetadataSyncedUpdateCommand generates and returns a SQL UPDATE command
1750  * that updates the metadataSynced column of pg_dist_node, for the given nodeid.
1751  */
1752 static char *
NodeMetadataSyncedUpdateCommand(uint32 nodeId,bool metadataSynced)1753 NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced)
1754 {
1755 	StringInfo updateCommand = makeStringInfo();
1756 	char *hasMetadataString = metadataSynced ? "TRUE" : "FALSE";
1757 	appendStringInfo(updateCommand,
1758 					 "UPDATE pg_dist_node SET metadatasynced = %s "
1759 					 "WHERE nodeid = %u",
1760 					 hasMetadataString, nodeId);
1761 	return updateCommand->data;
1762 }
1763 
1764 
1765 /*
1766  * ErrorIfCoordinatorMetadataSetFalse throws an error if the input node
1767  * is the coordinator and the value is false.
1768  */
1769 static void
ErrorIfCoordinatorMetadataSetFalse(WorkerNode * workerNode,Datum value,char * field)1770 ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field)
1771 {
1772 	bool valueBool = DatumGetBool(value);
1773 	if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
1774 	{
1775 		ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
1776 							   "coordinator node", field)));
1777 	}
1778 }
1779 
1780 
1781 /*
1782  * SetShouldHaveShards function sets the shouldhaveshards column of the
1783  * specified worker in pg_dist_node. also propagates this to other metadata nodes.
1784  * It returns the new worker node after the modification.
1785  */
1786 static WorkerNode *
SetShouldHaveShards(WorkerNode * workerNode,bool shouldHaveShards)1787 SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards)
1788 {
1789 	return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards, BoolGetDatum(
1790 							   shouldHaveShards));
1791 }
1792 
1793 
1794 /*
1795  * SetNodeState function sets the isactive column of the specified worker in
1796  * pg_dist_node to isActive. Also propagates this to other metadata nodes.
1797  * It returns the new worker node after the modification.
1798  */
1799 static WorkerNode *
SetNodeState(char * nodeName,int nodePort,bool isActive)1800 SetNodeState(char *nodeName, int nodePort, bool isActive)
1801 {
1802 	WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
1803 	return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(
1804 							   isActive));
1805 }
1806 
1807 
1808 /*
1809  * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the
1810  * node is not found this function returns NULL.
1811  *
1812  * This function may return worker nodes from other clusters.
1813  */
1814 static HeapTuple
GetNodeTuple(const char * nodeName,int32 nodePort)1815 GetNodeTuple(const char *nodeName, int32 nodePort)
1816 {
1817 	Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
1818 	const int scanKeyCount = 2;
1819 	const bool indexOK = false;
1820 
1821 	ScanKeyData scanKey[2];
1822 	HeapTuple nodeTuple = NULL;
1823 
1824 	ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
1825 				BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
1826 	ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
1827 				BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort));
1828 	SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK,
1829 													NULL, scanKeyCount, scanKey);
1830 
1831 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
1832 	if (HeapTupleIsValid(heapTuple))
1833 	{
1834 		nodeTuple = heap_copytuple(heapTuple);
1835 	}
1836 
1837 	systable_endscan(scanDescriptor);
1838 	table_close(pgDistNode, NoLock);
1839 
1840 	return nodeTuple;
1841 }
1842 
1843 
1844 /*
1845  * GetNextGroupId allocates and returns a unique groupId for the group
1846  * to be created. This allocation occurs both in shared memory and in write
1847  * ahead logs; writing to logs avoids the risk of having groupId collisions.
1848  *
1849  * Please note that the caller is still responsible for finalizing node data
1850  * and the groupId with the master node. Further note that this function relies
1851  * on an internal sequence created in initdb to generate unique identifiers.
1852  */
1853 int32
GetNextGroupId()1854 GetNextGroupId()
1855 {
1856 	text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME);
1857 	Oid sequenceId = ResolveRelationId(sequenceName, false);
1858 	Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
1859 	Oid savedUserId = InvalidOid;
1860 	int savedSecurityContext = 0;
1861 
1862 	GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
1863 	SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
1864 
1865 	/* generate new and unique shardId from sequence */
1866 	Datum groupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
1867 
1868 	SetUserIdAndSecContext(savedUserId, savedSecurityContext);
1869 
1870 	int32 groupId = DatumGetInt32(groupIdDatum);
1871 
1872 	return groupId;
1873 }
1874 
1875 
1876 /*
1877  * GetNextNodeId allocates and returns a unique nodeId for the node
1878  * to be added. This allocation occurs both in shared memory and in write
1879  * ahead logs; writing to logs avoids the risk of having nodeId collisions.
1880  *
1881  * Please note that the caller is still responsible for finalizing node data
1882  * and the nodeId with the master node. Further note that this function relies
1883  * on an internal sequence created in initdb to generate unique identifiers.
1884  */
1885 int
GetNextNodeId()1886 GetNextNodeId()
1887 {
1888 	text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME);
1889 	Oid sequenceId = ResolveRelationId(sequenceName, false);
1890 	Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
1891 	Oid savedUserId = InvalidOid;
1892 	int savedSecurityContext = 0;
1893 
1894 	GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
1895 	SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
1896 
1897 	/* generate new and unique shardId from sequence */
1898 	Datum nextNodeIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
1899 
1900 	SetUserIdAndSecContext(savedUserId, savedSecurityContext);
1901 
1902 	int nextNodeId = DatumGetUInt32(nextNodeIdDatum);
1903 
1904 	return nextNodeId;
1905 }
1906 
1907 
1908 /*
1909  * EnsureCoordinator checks if the current node is the coordinator. If it does not,
1910  * the function errors out.
1911  */
1912 void
EnsureCoordinator(void)1913 EnsureCoordinator(void)
1914 {
1915 	int32 localGroupId = GetLocalGroupId();
1916 
1917 	if (localGroupId != 0)
1918 	{
1919 		ereport(ERROR, (errmsg("operation is not allowed on this node"),
1920 						errhint("Connect to the coordinator and run it again.")));
1921 	}
1922 }
1923 
1924 
1925 /*
1926  * InsertCoordinatorIfClusterEmpty can be used to ensure Citus tables can be
1927  * created even on a node that has just performed CREATE EXTENSION citus;
1928  */
1929 void
InsertCoordinatorIfClusterEmpty(void)1930 InsertCoordinatorIfClusterEmpty(void)
1931 {
1932 	/* prevent concurrent node additions */
1933 	Relation pgDistNode = table_open(DistNodeRelationId(), RowShareLock);
1934 
1935 	if (!HasAnyNodes())
1936 	{
1937 		/*
1938 		 * create_distributed_table being called for the first time and there are
1939 		 * no pg_dist_node records. Add a record for the coordinator.
1940 		 */
1941 		InsertPlaceholderCoordinatorRecord();
1942 	}
1943 
1944 	/*
1945 	 * We release the lock, if InsertPlaceholderCoordinatorRecord was called
1946 	 * we already have a strong (RowExclusive) lock.
1947 	 */
1948 	table_close(pgDistNode, RowShareLock);
1949 }
1950 
1951 
1952 /*
1953  * InsertPlaceholderCoordinatorRecord inserts a placeholder record for the coordinator
1954  * to be able to create distributed tables on a single node.
1955  */
1956 static void
InsertPlaceholderCoordinatorRecord(void)1957 InsertPlaceholderCoordinatorRecord(void)
1958 {
1959 	NodeMetadata nodeMetadata = DefaultNodeMetadata();
1960 	nodeMetadata.groupId = 0;
1961 	nodeMetadata.shouldHaveShards = true;
1962 	nodeMetadata.nodeRole = PrimaryNodeRoleId();
1963 	nodeMetadata.nodeCluster = "default";
1964 
1965 	bool nodeAlreadyExists = false;
1966 
1967 	/* as long as there is a single node, localhost should be ok */
1968 	AddNodeMetadata(LocalHostName, PostPortNumber, &nodeMetadata, &nodeAlreadyExists);
1969 }
1970 
1971 
1972 /*
1973  * InsertNodeRow opens the node system catalog, and inserts a new row with the
1974  * given values into that system catalog.
1975  *
1976  * NOTE: If you call this function you probably need to have taken a
1977  * ShareRowExclusiveLock then checked that you're not adding a second primary to
1978  * an existing group. If you don't it's possible for the metadata to become inconsistent.
1979  */
1980 static void
InsertNodeRow(int nodeid,char * nodeName,int32 nodePort,NodeMetadata * nodeMetadata)1981 InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
1982 {
1983 	Datum values[Natts_pg_dist_node];
1984 	bool isNulls[Natts_pg_dist_node];
1985 
1986 	Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
1987 	Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
1988 
1989 	/* form new shard tuple */
1990 	memset(values, 0, sizeof(values));
1991 	memset(isNulls, false, sizeof(isNulls));
1992 
1993 	values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
1994 	values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
1995 	values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
1996 	values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
1997 	values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeMetadata->nodeRack);
1998 	values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(nodeMetadata->hasMetadata);
1999 	values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(
2000 		nodeMetadata->metadataSynced);
2001 	values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
2002 	values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
2003 	values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
2004 	values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
2005 		nodeMetadata->shouldHaveShards);
2006 
2007 	Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
2008 
2009 	TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
2010 	HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
2011 
2012 	CatalogTupleInsert(pgDistNode, heapTuple);
2013 
2014 	CitusInvalidateRelcacheByRelid(DistNodeRelationId());
2015 
2016 	/* increment the counter so that next command can see the row */
2017 	CommandCounterIncrement();
2018 
2019 	/* close relation */
2020 	table_close(pgDistNode, NoLock);
2021 }
2022 
2023 
2024 /*
2025  * DeleteNodeRow removes the requested row from pg_dist_node table if it exists.
2026  */
2027 static void
DeleteNodeRow(char * nodeName,int32 nodePort)2028 DeleteNodeRow(char *nodeName, int32 nodePort)
2029 {
2030 	const int scanKeyCount = 2;
2031 	bool indexOK = false;
2032 
2033 	ScanKeyData scanKey[2];
2034 	Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
2035 
2036 	/*
2037 	 * simple_heap_delete() expects that the caller has at least an
2038 	 * AccessShareLock on replica identity index.
2039 	 */
2040 	Relation replicaIndex = index_open(RelationGetReplicaIndex(pgDistNode),
2041 									   AccessShareLock);
2042 
2043 	ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
2044 				BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
2045 	ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
2046 				BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort));
2047 
2048 	SysScanDesc heapScan = systable_beginscan(pgDistNode, InvalidOid, indexOK,
2049 											  NULL, scanKeyCount, scanKey);
2050 
2051 	HeapTuple heapTuple = systable_getnext(heapScan);
2052 
2053 	if (!HeapTupleIsValid(heapTuple))
2054 	{
2055 		ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
2056 							   nodeName, nodePort)));
2057 	}
2058 
2059 	simple_heap_delete(pgDistNode, &(heapTuple->t_self));
2060 
2061 	systable_endscan(heapScan);
2062 
2063 	/* ensure future commands don't use the node we just removed */
2064 	CitusInvalidateRelcacheByRelid(DistNodeRelationId());
2065 
2066 	/* increment the counter so that next command won't see the row */
2067 	CommandCounterIncrement();
2068 
2069 	table_close(replicaIndex, AccessShareLock);
2070 	table_close(pgDistNode, NoLock);
2071 }
2072 
2073 
2074 /*
2075  * TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
2076  * converts this tuple to an equivalent struct in memory. The function assumes
2077  * the caller already has locks on the tuple, and doesn't perform any locking.
2078  */
2079 static WorkerNode *
TupleToWorkerNode(TupleDesc tupleDescriptor,HeapTuple heapTuple)2080 TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
2081 {
2082 	Datum datumArray[Natts_pg_dist_node];
2083 	bool isNullArray[Natts_pg_dist_node];
2084 
2085 	Assert(!HeapTupleHasNulls(heapTuple));
2086 
2087 	/*
2088 	 * This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...",
2089 	 * therefore heap_deform_tuple() won't set the isNullArray for this column. We
2090 	 * initialize it true to be safe in that case.
2091 	 */
2092 	memset(isNullArray, true, sizeof(isNullArray));
2093 
2094 	/*
2095 	 * We use heap_deform_tuple() instead of heap_getattr() to expand tuple
2096 	 * to contain missing values when ALTER TABLE ADD COLUMN happens.
2097 	 */
2098 	heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
2099 
2100 	char *nodeName = DatumGetCString(datumArray[Anum_pg_dist_node_nodename - 1]);
2101 	char *nodeRack = DatumGetCString(datumArray[Anum_pg_dist_node_noderack - 1]);
2102 
2103 	WorkerNode *workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
2104 	workerNode->nodeId = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeid - 1]);
2105 	workerNode->workerPort = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeport - 1]);
2106 	workerNode->groupId = DatumGetInt32(datumArray[Anum_pg_dist_node_groupid - 1]);
2107 	strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
2108 	strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
2109 	workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]);
2110 	workerNode->metadataSynced =
2111 		DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]);
2112 	workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
2113 	workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
2114 	workerNode->shouldHaveShards = DatumGetBool(
2115 		datumArray[Anum_pg_dist_node_shouldhaveshards -
2116 				   1]);
2117 
2118 	/*
2119 	 * nodecluster column can be missing. In the case of extension creation/upgrade,
2120 	 * master_initialize_node_metadata function is called before the nodecluster
2121 	 * column is added to pg_dist_node table.
2122 	 */
2123 	if (!isNullArray[Anum_pg_dist_node_nodecluster - 1])
2124 	{
2125 		Name nodeClusterName =
2126 			DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]);
2127 		char *nodeClusterString = NameStr(*nodeClusterName);
2128 		strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
2129 	}
2130 
2131 	return workerNode;
2132 }
2133 
2134 
2135 /*
2136  * StringToDatum transforms a string representation into a Datum.
2137  */
2138 Datum
StringToDatum(char * inputString,Oid dataType)2139 StringToDatum(char *inputString, Oid dataType)
2140 {
2141 	Oid typIoFunc = InvalidOid;
2142 	Oid typIoParam = InvalidOid;
2143 	int32 typeModifier = -1;
2144 
2145 	getTypeInputInfo(dataType, &typIoFunc, &typIoParam);
2146 	getBaseTypeAndTypmod(dataType, &typeModifier);
2147 
2148 	Datum datum = OidInputFunctionCall(typIoFunc, inputString, typIoParam, typeModifier);
2149 
2150 	return datum;
2151 }
2152 
2153 
2154 /*
2155  * DatumToString returns the string representation of the given datum.
2156  */
2157 char *
DatumToString(Datum datum,Oid dataType)2158 DatumToString(Datum datum, Oid dataType)
2159 {
2160 	Oid typIoFunc = InvalidOid;
2161 	bool typIsVarlena = false;
2162 
2163 	getTypeOutputInfo(dataType, &typIoFunc, &typIsVarlena);
2164 	char *outputString = OidOutputFunctionCall(typIoFunc, datum);
2165 
2166 	return outputString;
2167 }
2168 
2169 
2170 /*
2171  * UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata
2172  * nodes to false. It returns true if it updated at least a node.
2173  */
2174 static bool
UnsetMetadataSyncedForAll(void)2175 UnsetMetadataSyncedForAll(void)
2176 {
2177 	bool updatedAtLeastOne = false;
2178 	ScanKeyData scanKey[2];
2179 	int scanKeyCount = 2;
2180 	bool indexOK = false;
2181 
2182 	/*
2183 	 * Concurrent citus_update_node() calls might iterate and try to update
2184 	 * pg_dist_node in different orders. To protect against deadlock, we
2185 	 * get an exclusive lock here.
2186 	 */
2187 	Relation relation = table_open(DistNodeRelationId(), ExclusiveLock);
2188 	TupleDesc tupleDescriptor = RelationGetDescr(relation);
2189 	ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata,
2190 				BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
2191 	ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced,
2192 				BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
2193 
2194 	CatalogIndexState indstate = CatalogOpenIndexes(relation);
2195 
2196 	SysScanDesc scanDescriptor = systable_beginscan(relation,
2197 													InvalidOid, indexOK,
2198 													NULL, scanKeyCount, scanKey);
2199 
2200 	HeapTuple heapTuple = systable_getnext(scanDescriptor);
2201 	if (HeapTupleIsValid(heapTuple))
2202 	{
2203 		updatedAtLeastOne = true;
2204 	}
2205 
2206 	while (HeapTupleIsValid(heapTuple))
2207 	{
2208 		Datum values[Natts_pg_dist_node];
2209 		bool isnull[Natts_pg_dist_node];
2210 		bool replace[Natts_pg_dist_node];
2211 
2212 		memset(replace, false, sizeof(replace));
2213 		memset(isnull, false, sizeof(isnull));
2214 		memset(values, 0, sizeof(values));
2215 
2216 		values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
2217 		replace[Anum_pg_dist_node_metadatasynced - 1] = true;
2218 
2219 		HeapTuple newHeapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values,
2220 												   isnull,
2221 												   replace);
2222 
2223 		CatalogTupleUpdateWithInfo(relation, &newHeapTuple->t_self, newHeapTuple,
2224 								   indstate);
2225 
2226 		CommandCounterIncrement();
2227 
2228 		heap_freetuple(newHeapTuple);
2229 
2230 		heapTuple = systable_getnext(scanDescriptor);
2231 	}
2232 
2233 	systable_endscan(scanDescriptor);
2234 	CatalogCloseIndexes(indstate);
2235 	table_close(relation, NoLock);
2236 
2237 	return updatedAtLeastOne;
2238 }
2239