1 /*-------------------------------------------------------------------------
2  *
3  * test/shard_rebalancer.c
4  *
5  * This file contains functions used for unit testing the planning part of the
6  * shard rebalancer.
7  *
8  * Copyright (c) 2014-2019, Citus Data, Inc.
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 #include "postgres.h"
14 #include "libpq-fe.h"
15 
16 #include "safe_lib.h"
17 
18 #include "catalog/pg_type.h"
19 #include "distributed/citus_safe_lib.h"
20 #include "distributed/citus_ruleutils.h"
21 #include "distributed/connection_management.h"
22 #include "distributed/listutils.h"
23 #include "distributed/multi_physical_planner.h"
24 #include "distributed/shard_cleaner.h"
25 #include "distributed/shard_rebalancer.h"
26 #include "funcapi.h"
27 #include "miscadmin.h"
28 #include "utils/builtins.h"
29 #include "utils/int8.h"
30 #include "utils/json.h"
31 #include "utils/lsyscache.h"
32 #include "utils/memutils.h"
33 
34 /* static declarations for json conversion */
35 static List * JsonArrayToShardPlacementTestInfoList(
36 	ArrayType *shardPlacementJsonArrayObject);
37 static List * JsonArrayToWorkerTestInfoList(ArrayType *workerNodeJsonArrayObject);
38 static bool JsonFieldValueBoolDefault(Datum jsonDocument, const char *key,
39 									  bool defaultValue);
40 static uint32 JsonFieldValueUInt32Default(Datum jsonDocument, const char *key,
41 										  uint32 defaultValue);
42 static uint64 JsonFieldValueUInt64Default(Datum jsonDocument, const char *key,
43 										  uint64 defaultValue);
44 static char * JsonFieldValueString(Datum jsonDocument, const char *key);
45 static ArrayType * PlacementUpdateListToJsonArray(List *placementUpdateList);
46 static bool ShardAllowedOnNode(uint64 shardId, WorkerNode *workerNode, void *context);
47 static float NodeCapacity(WorkerNode *workerNode, void *context);
48 static ShardCost GetShardCost(uint64 shardId, void *context);
49 
50 
51 PG_FUNCTION_INFO_V1(shard_placement_rebalance_array);
52 PG_FUNCTION_INFO_V1(shard_placement_replication_array);
53 PG_FUNCTION_INFO_V1(worker_node_responsive);
54 PG_FUNCTION_INFO_V1(run_try_drop_marked_shards);
55 
56 typedef struct ShardPlacementTestInfo
57 {
58 	ShardPlacement *placement;
59 	uint64 cost;
60 	bool nextColocationGroup;
61 } ShardPlacementTestInfo;
62 
63 typedef struct WorkerTestInfo
64 {
65 	WorkerNode *node;
66 	List *disallowedShardIds;
67 	float capacity;
68 } WorkerTestInfo;
69 
70 typedef struct RebalancePlanContext
71 {
72 	List *workerTestInfoList;
73 	List *shardPlacementTestInfoList;
74 } RebalancePlacementContext;
75 
76 /*
77  * run_try_drop_marked_shards is a wrapper to run TryDropOrphanedShards.
78  */
79 Datum
run_try_drop_marked_shards(PG_FUNCTION_ARGS)80 run_try_drop_marked_shards(PG_FUNCTION_ARGS)
81 {
82 	bool waitForLocks = false;
83 	TryDropOrphanedShards(waitForLocks);
84 	PG_RETURN_VOID();
85 }
86 
87 
88 /*
89  * shard_placement_rebalance_array returns a list of operations which can make a
90  * cluster consisting of given shard placements and worker nodes balanced with
91  * respect to the given threshold. Threshold is a value between 0 and 1 which
92  * determines the evenness in shard distribution. When threshold is 0, then all
93  * nodes should have equal number of shards. As threshold increases, cluster's
94  * evenness requirements decrease, and we can rebalance the cluster using less
95  * operations.
96  */
97 Datum
shard_placement_rebalance_array(PG_FUNCTION_ARGS)98 shard_placement_rebalance_array(PG_FUNCTION_ARGS)
99 {
100 	ArrayType *workerNodeJsonArray = PG_GETARG_ARRAYTYPE_P(0);
101 	ArrayType *shardPlacementJsonArray = PG_GETARG_ARRAYTYPE_P(1);
102 	float threshold = PG_GETARG_FLOAT4(2);
103 	int32 maxShardMoves = PG_GETARG_INT32(3);
104 	bool drainOnly = PG_GETARG_BOOL(4);
105 	float utilizationImproventThreshold = PG_GETARG_FLOAT4(5);
106 
107 	List *workerNodeList = NIL;
108 	List *shardPlacementListList = NIL;
109 	List *shardPlacementList = NIL;
110 	WorkerTestInfo *workerTestInfo = NULL;
111 	ShardPlacementTestInfo *shardPlacementTestInfo = NULL;
112 	RebalancePlanFunctions rebalancePlanFunctions = {
113 		.shardAllowedOnNode = ShardAllowedOnNode,
114 		.nodeCapacity = NodeCapacity,
115 		.shardCost = GetShardCost,
116 	};
117 	RebalancePlacementContext context = {
118 		.workerTestInfoList = NULL,
119 	};
120 
121 	context.workerTestInfoList = JsonArrayToWorkerTestInfoList(workerNodeJsonArray);
122 	context.shardPlacementTestInfoList = JsonArrayToShardPlacementTestInfoList(
123 		shardPlacementJsonArray);
124 
125 	/* we don't need original arrays any more, so we free them to save memory */
126 	pfree(workerNodeJsonArray);
127 	pfree(shardPlacementJsonArray);
128 
129 	/* map workerTestInfoList to a list of its WorkerNodes */
130 	foreach_ptr(workerTestInfo, context.workerTestInfoList)
131 	{
132 		workerNodeList = lappend(workerNodeList, workerTestInfo->node);
133 	}
134 
135 	/* map shardPlacementTestInfoList to a list of list of its ShardPlacements */
136 	foreach_ptr(shardPlacementTestInfo, context.shardPlacementTestInfoList)
137 	{
138 		if (shardPlacementTestInfo->nextColocationGroup)
139 		{
140 			shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
141 			shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
142 			shardPlacementList = NIL;
143 		}
144 		shardPlacementList = lappend(shardPlacementList,
145 									 shardPlacementTestInfo->placement);
146 	}
147 	shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
148 	shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
149 
150 	rebalancePlanFunctions.context = &context;
151 
152 	/* sort the lists to make the function more deterministic */
153 	workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
154 
155 	List *placementUpdateList = RebalancePlacementUpdates(workerNodeList,
156 														  shardPlacementListList,
157 														  threshold,
158 														  maxShardMoves,
159 														  drainOnly,
160 														  utilizationImproventThreshold,
161 														  &rebalancePlanFunctions);
162 	ArrayType *placementUpdateJsonArray = PlacementUpdateListToJsonArray(
163 		placementUpdateList);
164 
165 	PG_RETURN_ARRAYTYPE_P(placementUpdateJsonArray);
166 }
167 
168 
169 /*
170  * ShardAllowedOnNode is the function that checks if shard is allowed to be on
171  * a worker when running the shard rebalancer unit tests.
172  */
173 static bool
ShardAllowedOnNode(uint64 shardId,WorkerNode * workerNode,void * voidContext)174 ShardAllowedOnNode(uint64 shardId, WorkerNode *workerNode, void *voidContext)
175 {
176 	RebalancePlacementContext *context = voidContext;
177 	WorkerTestInfo *workerTestInfo = NULL;
178 	uint64 *disallowedShardIdPtr = NULL;
179 	foreach_ptr(workerTestInfo, context->workerTestInfoList)
180 	{
181 		if (workerTestInfo->node == workerNode)
182 		{
183 			break;
184 		}
185 	}
186 	Assert(workerTestInfo != NULL);
187 
188 	foreach_ptr(disallowedShardIdPtr, workerTestInfo->disallowedShardIds)
189 	{
190 		if (shardId == *disallowedShardIdPtr)
191 		{
192 			return false;
193 		}
194 	}
195 	return true;
196 }
197 
198 
199 /*
200  * NodeCapacity is the function that gets the capacity of a worker when running
201  * the shard rebalancer unit tests.
202  */
203 static float
NodeCapacity(WorkerNode * workerNode,void * voidContext)204 NodeCapacity(WorkerNode *workerNode, void *voidContext)
205 {
206 	RebalancePlacementContext *context = voidContext;
207 	WorkerTestInfo *workerTestInfo = NULL;
208 	foreach_ptr(workerTestInfo, context->workerTestInfoList)
209 	{
210 		if (workerTestInfo->node == workerNode)
211 		{
212 			break;
213 		}
214 	}
215 	Assert(workerTestInfo != NULL);
216 	return workerTestInfo->capacity;
217 }
218 
219 
220 /*
221  * GetShardCost is the function that gets the ShardCost of a shard when running
222  * the shard rebalancer unit tests.
223  */
224 static ShardCost
GetShardCost(uint64 shardId,void * voidContext)225 GetShardCost(uint64 shardId, void *voidContext)
226 {
227 	RebalancePlacementContext *context = voidContext;
228 	ShardCost shardCost;
229 	memset_struct_0(shardCost);
230 	shardCost.shardId = shardId;
231 
232 	ShardPlacementTestInfo *shardPlacementTestInfo = NULL;
233 	foreach_ptr(shardPlacementTestInfo, context->shardPlacementTestInfoList)
234 	{
235 		if (shardPlacementTestInfo->placement->shardId == shardId)
236 		{
237 			break;
238 		}
239 	}
240 	Assert(shardPlacementTestInfo != NULL);
241 	shardCost.cost = shardPlacementTestInfo->cost;
242 	return shardCost;
243 }
244 
245 
246 /*
247  * shard_placement_replication_array returns a list of operations which will
248  * replicate under-replicated shards in a cluster consisting of given shard
249  * placements and worker nodes. A shard is under-replicated if it has less
250  * active placements than the given shard replication factor.
251  */
252 Datum
shard_placement_replication_array(PG_FUNCTION_ARGS)253 shard_placement_replication_array(PG_FUNCTION_ARGS)
254 {
255 	ArrayType *workerNodeJsonArray = PG_GETARG_ARRAYTYPE_P(0);
256 	ArrayType *shardPlacementJsonArray = PG_GETARG_ARRAYTYPE_P(1);
257 	uint32 shardReplicationFactor = PG_GETARG_INT32(2);
258 
259 	List *workerNodeList = NIL;
260 	List *shardPlacementList = NIL;
261 	WorkerTestInfo *workerTestInfo = NULL;
262 	ShardPlacementTestInfo *shardPlacementTestInfo = NULL;
263 
264 	/* validate shard replication factor */
265 	if (shardReplicationFactor < SHARD_REPLICATION_FACTOR_MINIMUM ||
266 		shardReplicationFactor > SHARD_REPLICATION_FACTOR_MAXIMUM)
267 	{
268 		ereport(ERROR, (errmsg("invalid shard replication factor"),
269 						errhint("Shard replication factor must be an integer "
270 								"between %d and %d", SHARD_REPLICATION_FACTOR_MINIMUM,
271 								SHARD_REPLICATION_FACTOR_MAXIMUM)));
272 	}
273 
274 	List *workerTestInfoList = JsonArrayToWorkerTestInfoList(workerNodeJsonArray);
275 	List *shardPlacementTestInfoList = JsonArrayToShardPlacementTestInfoList(
276 		shardPlacementJsonArray);
277 
278 	/* we don't need original arrays any more, so we free them to save memory */
279 	pfree(workerNodeJsonArray);
280 	pfree(shardPlacementJsonArray);
281 
282 	foreach_ptr(workerTestInfo, workerTestInfoList)
283 	{
284 		workerNodeList = lappend(workerNodeList, workerTestInfo->node);
285 	}
286 
287 	foreach_ptr(shardPlacementTestInfo, shardPlacementTestInfoList)
288 	{
289 		shardPlacementList = lappend(shardPlacementList,
290 									 shardPlacementTestInfo->placement);
291 	}
292 
293 	/* sort the lists to make the function more deterministic */
294 	workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
295 	shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
296 
297 	List *placementUpdateList = ReplicationPlacementUpdates(workerNodeList,
298 															shardPlacementList,
299 															shardReplicationFactor);
300 	ArrayType *placementUpdateJsonArray = PlacementUpdateListToJsonArray(
301 		placementUpdateList);
302 
303 	PG_RETURN_ARRAYTYPE_P(placementUpdateJsonArray);
304 }
305 
306 
307 /*
308  * JsonArrayToShardPlacementTestInfoList converts the given shard placement json array
309  * to a list of ShardPlacement structs.
310  */
311 static List *
JsonArrayToShardPlacementTestInfoList(ArrayType * shardPlacementJsonArrayObject)312 JsonArrayToShardPlacementTestInfoList(ArrayType *shardPlacementJsonArrayObject)
313 {
314 	List *shardPlacementTestInfoList = NIL;
315 	Datum *shardPlacementJsonArray = NULL;
316 	int placementCount = 0;
317 
318 	/*
319 	 * Memory is not automatically freed when we call UDFs using DirectFunctionCall.
320 	 * We call these functions in functionCallContext, so we can free the memory
321 	 * once they return.
322 	 */
323 	MemoryContext functionCallContext = AllocSetContextCreate(CurrentMemoryContext,
324 															  "Function Call Context",
325 															  ALLOCSET_DEFAULT_MINSIZE,
326 															  ALLOCSET_DEFAULT_INITSIZE,
327 															  ALLOCSET_DEFAULT_MAXSIZE);
328 
329 	deconstruct_array(shardPlacementJsonArrayObject, JSONOID, -1, false, 'i',
330 					  &shardPlacementJsonArray, NULL, &placementCount);
331 
332 	for (int placementIndex = 0; placementIndex < placementCount; placementIndex++)
333 	{
334 		Datum placementJson = shardPlacementJsonArray[placementIndex];
335 		ShardPlacementTestInfo *placementTestInfo = palloc0(
336 			sizeof(ShardPlacementTestInfo));
337 
338 		MemoryContext oldContext = MemoryContextSwitchTo(functionCallContext);
339 
340 		uint64 shardId = JsonFieldValueUInt64Default(
341 			placementJson, FIELD_NAME_SHARD_ID, placementIndex + 1);
342 		uint64 shardLength = JsonFieldValueUInt64Default(
343 			placementJson, FIELD_NAME_SHARD_LENGTH, 1);
344 		int shardState = JsonFieldValueUInt32Default(
345 			placementJson, FIELD_NAME_SHARD_STATE, SHARD_STATE_ACTIVE);
346 		char *nodeName = JsonFieldValueString(placementJson, FIELD_NAME_NODE_NAME);
347 		if (nodeName == NULL)
348 		{
349 			ereport(ERROR, (errmsg(FIELD_NAME_NODE_NAME " needs be set")));
350 		}
351 		int nodePort = JsonFieldValueUInt32Default(
352 			placementJson, FIELD_NAME_NODE_PORT, 5432);
353 		uint64 placementId = JsonFieldValueUInt64Default(
354 			placementJson, FIELD_NAME_PLACEMENT_ID, placementIndex + 1);
355 
356 		uint64 cost = JsonFieldValueUInt64Default(placementJson, "cost", 1);
357 		bool nextColocationGroup =
358 			JsonFieldValueBoolDefault(placementJson, "next_colocation", false);
359 
360 		MemoryContextSwitchTo(oldContext);
361 
362 		placementTestInfo->placement = palloc0(sizeof(ShardPlacement));
363 		placementTestInfo->placement->shardId = shardId;
364 		placementTestInfo->placement->shardLength = shardLength;
365 		placementTestInfo->placement->shardState = shardState;
366 		placementTestInfo->placement->nodeName = pstrdup(nodeName);
367 		placementTestInfo->placement->nodePort = nodePort;
368 		placementTestInfo->placement->placementId = placementId;
369 		placementTestInfo->cost = cost;
370 		placementTestInfo->nextColocationGroup = nextColocationGroup;
371 
372 		/*
373 		 * We have copied whatever we needed from the UDF calls, so we can free
374 		 * the memory allocated by them.
375 		 */
376 		MemoryContextReset(functionCallContext);
377 
378 
379 		shardPlacementTestInfoList = lappend(shardPlacementTestInfoList,
380 											 placementTestInfo);
381 	}
382 
383 	pfree(shardPlacementJsonArray);
384 
385 	return shardPlacementTestInfoList;
386 }
387 
388 
389 /*
390  * JsonArrayToWorkerNodeList converts the given worker node json array to a list
391  * of WorkerNode structs.
392  */
393 static List *
JsonArrayToWorkerTestInfoList(ArrayType * workerNodeJsonArrayObject)394 JsonArrayToWorkerTestInfoList(ArrayType *workerNodeJsonArrayObject)
395 {
396 	List *workerTestInfoList = NIL;
397 	Datum *workerNodeJsonArray = NULL;
398 	int workerNodeCount = 0;
399 
400 	deconstruct_array(workerNodeJsonArrayObject, JSONOID, -1, false, 'i',
401 					  &workerNodeJsonArray, NULL, &workerNodeCount);
402 
403 	for (int workerNodeIndex = 0; workerNodeIndex < workerNodeCount; workerNodeIndex++)
404 	{
405 		Datum workerNodeJson = workerNodeJsonArray[workerNodeIndex];
406 		char *workerName = JsonFieldValueString(workerNodeJson, FIELD_NAME_WORKER_NAME);
407 		if (workerName == NULL)
408 		{
409 			ereport(ERROR, (errmsg(FIELD_NAME_WORKER_NAME " needs be set")));
410 		}
411 		uint32 workerPort = JsonFieldValueUInt32Default(workerNodeJson,
412 														FIELD_NAME_WORKER_PORT, 5432);
413 		List *disallowedShardIdList = NIL;
414 
415 
416 		WorkerTestInfo *workerTestInfo = palloc0(sizeof(WorkerTestInfo));
417 		WorkerNode *workerNode = palloc0(sizeof(WorkerNode));
418 		strncpy_s(workerNode->workerName, sizeof(workerNode->workerName), workerName,
419 				  WORKER_LENGTH);
420 		workerNode->nodeId = workerNodeIndex;
421 		workerNode->workerPort = workerPort;
422 		workerNode->shouldHaveShards = true;
423 		workerNode->nodeRole = PrimaryNodeRoleId();
424 		workerTestInfo->node = workerNode;
425 
426 		workerTestInfo->capacity = JsonFieldValueUInt64Default(workerNodeJson,
427 															   "capacity", 1);
428 
429 		workerTestInfoList = lappend(workerTestInfoList, workerTestInfo);
430 		char *disallowedShardsString = JsonFieldValueString(
431 			workerNodeJson, "disallowed_shards");
432 
433 		if (disallowedShardsString == NULL)
434 		{
435 			continue;
436 		}
437 
438 		char *strtokPosition = NULL;
439 		char *shardString = strtok_r(disallowedShardsString, ",", &strtokPosition);
440 		while (shardString != NULL)
441 		{
442 			uint64 *shardInt = palloc0(sizeof(uint64));
443 			*shardInt = SafeStringToUint64(shardString);
444 			disallowedShardIdList = lappend(disallowedShardIdList, shardInt);
445 			shardString = strtok_r(NULL, ",", &strtokPosition);
446 		}
447 		workerTestInfo->disallowedShardIds = disallowedShardIdList;
448 	}
449 
450 	return workerTestInfoList;
451 }
452 
453 
454 /*
455  * JsonFieldValueBoolDefault gets the value of the given key in the given json
456  * document and returns it as a boolean. If the field does not exist in the
457  * JSON it returns defaultValue.
458  */
459 static bool
JsonFieldValueBoolDefault(Datum jsonDocument,const char * key,bool defaultValue)460 JsonFieldValueBoolDefault(Datum jsonDocument, const char *key, bool defaultValue)
461 {
462 	char *valueString = JsonFieldValueString(jsonDocument, key);
463 	if (valueString == NULL)
464 	{
465 		return defaultValue;
466 	}
467 	Datum valueBoolDatum = DirectFunctionCall1(boolin, CStringGetDatum(valueString));
468 
469 	return DatumGetBool(valueBoolDatum);
470 }
471 
472 
473 /*
474  * JsonFieldValueUInt32Default gets the value of the given key in the given json
475  * document and returns it as an unsigned 32-bit integer. If the field does not
476  * exist in the JSON it returns defaultValue.
477  */
478 static uint32
JsonFieldValueUInt32Default(Datum jsonDocument,const char * key,uint32 defaultValue)479 JsonFieldValueUInt32Default(Datum jsonDocument, const char *key, uint32 defaultValue)
480 {
481 	char *valueString = JsonFieldValueString(jsonDocument, key);
482 	if (valueString == NULL)
483 	{
484 		return defaultValue;
485 	}
486 	Datum valueInt4Datum = DirectFunctionCall1(int4in, CStringGetDatum(valueString));
487 
488 	uint32 valueUInt32 = DatumGetInt32(valueInt4Datum);
489 	return valueUInt32;
490 }
491 
492 
493 /*
494  * JsonFieldValueUInt64 gets the value of the given key in the given json
495  * document and returns it as an unsigned 64-bit integer. If the field does not
496  * exist in the JSON it returns defaultValue.
497  */
498 static uint64
JsonFieldValueUInt64Default(Datum jsonDocument,const char * key,uint64 defaultValue)499 JsonFieldValueUInt64Default(Datum jsonDocument, const char *key, uint64 defaultValue)
500 {
501 	char *valueString = JsonFieldValueString(jsonDocument, key);
502 	if (valueString == NULL)
503 	{
504 		return defaultValue;
505 	}
506 	Datum valueInt8Datum = DirectFunctionCall1(int8in, CStringGetDatum(valueString));
507 
508 	uint64 valueUInt64 = DatumGetInt64(valueInt8Datum);
509 	return valueUInt64;
510 }
511 
512 
513 /*
514  * DirectFunctionalCall2Null is a version of DirectFunctionCall2 that can
515  * return NULL. It still does not support NULL arguments though.
516  */
517 static Datum
DirectFunctionCall2Null(PGFunction func,bool * isnull,Datum arg1,Datum arg2)518 DirectFunctionCall2Null(PGFunction func, bool *isnull, Datum arg1, Datum arg2)
519 {
520 	LOCAL_FCINFO(fcinfo, 2);
521 
522 	InitFunctionCallInfoData(*fcinfo, NULL, 2, InvalidOid, NULL, NULL);
523 
524 	fcinfo->args[0].value = arg1;
525 	fcinfo->args[0].isnull = false;
526 	fcinfo->args[1].value = arg2;
527 	fcinfo->args[1].isnull = false;
528 
529 	Datum result = (*func)(fcinfo);
530 	if (fcinfo->isnull)
531 	{
532 		*isnull = true;
533 		return 0;
534 	}
535 
536 	*isnull = false;
537 	return result;
538 }
539 
540 
541 /*
542  * JsonFieldValueString gets the value of the given key in the given json
543  * document and returns it as a string. If the field does not exist in the JSON
544  * it returns NULL.
545  */
546 static char *
JsonFieldValueString(Datum jsonDocument,const char * key)547 JsonFieldValueString(Datum jsonDocument, const char *key)
548 {
549 	bool isnull = false;
550 	Datum keyDatum = PointerGetDatum(cstring_to_text(key));
551 
552 	Datum valueTextDatum = DirectFunctionCall2Null(
553 		json_object_field_text, &isnull, jsonDocument, keyDatum);
554 	if (isnull)
555 	{
556 		return NULL;
557 	}
558 
559 	char *valueString = text_to_cstring(DatumGetTextP(valueTextDatum));
560 	return valueString;
561 }
562 
563 
564 /*
565  * PlacementUpdateListToJsonArray converts the given list of placement update
566  * data to a json array.
567  */
568 static ArrayType *
PlacementUpdateListToJsonArray(List * placementUpdateList)569 PlacementUpdateListToJsonArray(List *placementUpdateList)
570 {
571 	ListCell *placementUpdateCell = NULL;
572 	int placementUpdateIndex = 0;
573 
574 	int placementUpdateCount = list_length(placementUpdateList);
575 	Datum *placementUpdateJsonArray = palloc0(placementUpdateCount * sizeof(Datum));
576 
577 	foreach(placementUpdateCell, placementUpdateList)
578 	{
579 		PlacementUpdateEvent *placementUpdateEvent = lfirst(placementUpdateCell);
580 		WorkerNode *sourceNode = placementUpdateEvent->sourceNode;
581 		WorkerNode *targetNode = placementUpdateEvent->targetNode;
582 
583 		StringInfo escapedSourceName = makeStringInfo();
584 		escape_json(escapedSourceName, sourceNode->workerName);
585 
586 		StringInfo escapedTargetName = makeStringInfo();
587 		escape_json(escapedTargetName, targetNode->workerName);
588 
589 		StringInfo placementUpdateJsonString = makeStringInfo();
590 		appendStringInfo(placementUpdateJsonString, PLACEMENT_UPDATE_JSON_FORMAT,
591 						 placementUpdateEvent->updateType, placementUpdateEvent->shardId,
592 						 escapedSourceName->data, sourceNode->workerPort,
593 						 escapedTargetName->data, targetNode->workerPort);
594 
595 		Datum placementUpdateStringDatum = CStringGetDatum(
596 			placementUpdateJsonString->data);
597 		Datum placementUpdateJsonDatum = DirectFunctionCall1(json_in,
598 															 placementUpdateStringDatum);
599 
600 		placementUpdateJsonArray[placementUpdateIndex] = placementUpdateJsonDatum;
601 		placementUpdateIndex++;
602 	}
603 
604 	ArrayType *placementUpdateObject = construct_array(placementUpdateJsonArray,
605 													   placementUpdateCount, JSONOID,
606 													   -1, false, 'i');
607 
608 	return placementUpdateObject;
609 }
610 
611 
612 /*
613  * worker_node_responsive returns true if the given worker node is responsive.
614  * Otherwise, it returns false.
615  */
616 Datum
worker_node_responsive(PG_FUNCTION_ARGS)617 worker_node_responsive(PG_FUNCTION_ARGS)
618 {
619 	text *workerNameText = PG_GETARG_TEXT_PP(0);
620 	uint32 workerPort = PG_GETARG_INT32(1);
621 	int connectionFlag = FORCE_NEW_CONNECTION;
622 
623 	bool workerNodeResponsive = false;
624 	const char *workerName = text_to_cstring(workerNameText);
625 
626 	MultiConnection *connection = GetNodeConnection(connectionFlag, workerName,
627 													workerPort);
628 
629 	if (connection != NULL && connection->pgConn != NULL)
630 	{
631 		if (PQstatus(connection->pgConn) == CONNECTION_OK)
632 		{
633 			workerNodeResponsive = true;
634 		}
635 
636 		CloseConnection(connection);
637 	}
638 
639 	PG_RETURN_BOOL(workerNodeResponsive);
640 }
641