1 /*-------------------------------------------------------------------------
2  *
3  * citus_outfuncs.c
4  *	  Output functions for Citus tree nodes.
5  *
6  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  * Portions Copyright (c) Citus Data, Inc.
9  *
10  * NOTES
11  *	  This is a wrapper around postgres' nodeToString() that additionally
12  *	  supports Citus node types.
13  *
14  *    Keep as closely aligned with the upstream version as possible.
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "distributed/pg_version_constants.h"
22 
23 #include <ctype.h>
24 
25 #include "distributed/citus_nodefuncs.h"
26 #include "distributed/citus_nodes.h"
27 #include "distributed/coordinator_protocol.h"
28 #include "distributed/errormessage.h"
29 #include "distributed/log_utils.h"
30 #include "distributed/multi_logical_planner.h"
31 #include "distributed/multi_physical_planner.h"
32 #include "distributed/distributed_planner.h"
33 #include "distributed/multi_server_executor.h"
34 #include "distributed/metadata_utility.h"
35 #include "lib/stringinfo.h"
36 #include "nodes/plannodes.h"
37 #include "nodes/pathnodes.h"
38 #include "utils/datum.h"
39 
40 
41 /*
42  * Macros to simplify output of different kinds of fields.  Use these
43  * wherever possible to reduce the chance for silly typos.  Note that these
44  * hard-wire conventions about the names of the local variables in an Out
45  * routine.
46  */
47 
48 /* Store const reference to raw input node in local named 'node' */
49 #define WRITE_LOCALS(nodeTypeName) \
50 		const nodeTypeName *node = (const nodeTypeName *) raw_node
51 
52 /* Write the label for the node type */
53 #define WRITE_NODE_TYPE(nodelabel) \
54 	(void) 0
55 
56 /* Write an integer field (anything written as ":fldname %d") */
57 #define WRITE_INT_FIELD(fldname) \
58 	appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
59 
60 /* Write an 64-bit integer field (anything written as ":fldname %d") */
61 #define WRITE_INT64_FIELD(fldname) \
62 	appendStringInfo(str, " :" CppAsString(fldname) " " INT64_FORMAT, node->fldname)
63 
64 
65 /* Write an unsigned integer field (anything written as ":fldname %u") */
66 #define WRITE_UINT_FIELD(fldname) \
67 	appendStringInfo(str, " :" CppAsString(fldname) " %u", node->fldname)
68 
69 /* XXX: Citus: Write an unsigned 64-bit integer field */
70 #define WRITE_UINT64_FIELD(fldname) \
71 	appendStringInfo(str, " :" CppAsString(fldname) " " UINT64_FORMAT, node->fldname)
72 
73 /* Write an OID field (don't hard-wire assumption that OID is same as uint) */
74 #define WRITE_OID_FIELD(fldname) \
75 	appendStringInfo(str, " :" CppAsString(fldname) " %u", node->fldname)
76 
77 /* Write a char field (ie, one ascii character) */
78 #define WRITE_CHAR_FIELD(fldname) \
79 	appendStringInfo(str, " :" CppAsString(fldname) " %c", node->fldname)
80 
81 /* Write an enumerated-type field as an integer code */
82 #define WRITE_ENUM_FIELD(fldname, enumtype) \
83 	appendStringInfo(str, " :" CppAsString(fldname) " %d", \
84 					 (int) node->fldname)
85 
86 /* Write a float field --- caller must give format to define precision */
87 #define WRITE_FLOAT_FIELD(fldname,format) \
88 	appendStringInfo(str, " :" CppAsString(fldname) " " format, node->fldname)
89 
90 /* Write a boolean field */
91 #define WRITE_BOOL_FIELD(fldname) \
92 	appendStringInfo(str, " :" CppAsString(fldname) " %s", \
93 					 booltostr(node->fldname))
94 
95 /* Write a character-string (possibly NULL) field */
96 #define WRITE_STRING_FIELD(fldname) \
97 	(appendStringInfo(str, " :" CppAsString(fldname) " "), \
98 	 outToken(str, node->fldname))
99 
100 /* Write a parse location field (actually same as INT case) */
101 #define WRITE_LOCATION_FIELD(fldname) \
102 	appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
103 
104 /* Write a Node field */
105 #define WRITE_NODE_FIELD(fldname) \
106 	(appendStringInfo(str, " :" CppAsString(fldname) " "), \
107 	 outNode(str, node->fldname))
108 
109 /* Write a bitmapset field */
110 #define WRITE_BITMAPSET_FIELD(fldname) \
111 	(appendStringInfo(str, " :" CppAsString(fldname) " "), \
112 	 _outBitmapset(str, node->fldname))
113 
114 #define WRITE_CUSTOM_FIELD(fldname, fldvalue) \
115 	(appendStringInfo(str, " :" CppAsString(fldname) " "), \
116 	appendStringInfoString(str, (fldvalue)))
117 
118 
119 /* Write an integer array (anything written as ":fldname (%d, %d") */
120 #define WRITE_INT_ARRAY(fldname, count) \
121 	appendStringInfo(str, " :" CppAsString(fldname) " ("); \
122 	{ \
123 		int i;\
124 		for (i = 0; i < count; i++) \
125 		{ \
126 			if (i > 0) \
127 			{ \
128 				appendStringInfo(str, ", "); \
129 			} \
130 			appendStringInfo(str, "%d", node->fldname[i]); \
131 		}\
132 	}\
133 	appendStringInfo(str, ")")
134 
135 
136 /* Write an enum array (anything written as ":fldname (%d, %d") */
137 #define WRITE_ENUM_ARRAY(fldname, count) WRITE_INT_ARRAY(fldname, count)
138 
139 
140 #define booltostr(x)  ((x) ? "true" : "false")
141 static void WriteTaskQuery(OUTFUNC_ARGS);
142 
143 /*****************************************************************************
144  *	Output routines for Citus node types
145  *****************************************************************************/
146 
147 static void
OutMultiUnaryNodeFields(StringInfo str,const MultiUnaryNode * node)148 OutMultiUnaryNodeFields(StringInfo str, const MultiUnaryNode *node)
149 {
150 	WRITE_NODE_FIELD(childNode);
151 }
152 
153 
154 static void
OutMultiBinaryNodeFields(StringInfo str,const MultiBinaryNode * node)155 OutMultiBinaryNodeFields(StringInfo str, const MultiBinaryNode *node)
156 {
157 	WRITE_NODE_FIELD(leftChildNode);
158 	WRITE_NODE_FIELD(rightChildNode);
159 }
160 
161 void
OutMultiNode(OUTFUNC_ARGS)162 OutMultiNode(OUTFUNC_ARGS)
163 {
164 	WRITE_NODE_TYPE("MULTINODE");
165 }
166 
167 
168 void
OutMultiTreeRoot(OUTFUNC_ARGS)169 OutMultiTreeRoot(OUTFUNC_ARGS)
170 {
171 	WRITE_LOCALS(MultiTreeRoot);
172 
173 	WRITE_NODE_TYPE("MULTITREEROOT");
174 
175 	OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
176 }
177 
178 
179 void
OutDistributedPlan(OUTFUNC_ARGS)180 OutDistributedPlan(OUTFUNC_ARGS)
181 {
182 	WRITE_LOCALS(DistributedPlan);
183 
184 	WRITE_NODE_TYPE("DISTRIBUTEDPLAN");
185 
186 	WRITE_UINT64_FIELD(planId);
187 	WRITE_ENUM_FIELD(modLevel, RowModifyLevel);
188 	WRITE_BOOL_FIELD(expectResults);
189 
190 	WRITE_NODE_FIELD(workerJob);
191 	WRITE_NODE_FIELD(combineQuery);
192 	WRITE_UINT64_FIELD(queryId);
193 	WRITE_NODE_FIELD(relationIdList);
194 	WRITE_OID_FIELD(targetRelationId);
195 	WRITE_NODE_FIELD(insertSelectQuery);
196 	WRITE_STRING_FIELD(intermediateResultIdPrefix);
197 
198 	WRITE_NODE_FIELD(subPlanList);
199 	WRITE_NODE_FIELD(usedSubPlanNodeList);
200 	WRITE_BOOL_FIELD(fastPathRouterPlan);
201 	WRITE_UINT_FIELD(numberOfTimesExecuted);
202 
203 	WRITE_NODE_FIELD(planningError);
204 }
205 
206 
207 void
OutDistributedSubPlan(OUTFUNC_ARGS)208 OutDistributedSubPlan(OUTFUNC_ARGS)
209 {
210 	WRITE_LOCALS(DistributedSubPlan);
211 
212 	WRITE_NODE_TYPE("DISTRIBUTEDSUBPLAN");
213 
214 	WRITE_UINT_FIELD(subPlanId);
215 	WRITE_NODE_FIELD(plan);
216 }
217 
218 void
OutUsedDistributedSubPlan(OUTFUNC_ARGS)219 OutUsedDistributedSubPlan(OUTFUNC_ARGS)
220 {
221 	WRITE_LOCALS(UsedDistributedSubPlan);
222 
223 	WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN");
224 
225 	WRITE_STRING_FIELD(subPlanId);
226 	WRITE_ENUM_FIELD(accessType, SubPlanAccessType);
227 }
228 
229 
230 void
OutMultiProject(OUTFUNC_ARGS)231 OutMultiProject(OUTFUNC_ARGS)
232 {
233 	WRITE_LOCALS(MultiProject);
234 	WRITE_NODE_TYPE("MULTIPROJECT");
235 
236 	WRITE_NODE_FIELD(columnList);
237 
238 	OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
239 }
240 
241 
242 void
OutMultiCollect(OUTFUNC_ARGS)243 OutMultiCollect(OUTFUNC_ARGS)
244 {
245 	WRITE_LOCALS(MultiCollect);
246 	WRITE_NODE_TYPE("MULTICOLLECT");
247 
248 	OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
249 }
250 
251 
252 void
OutMultiSelect(OUTFUNC_ARGS)253 OutMultiSelect(OUTFUNC_ARGS)
254 {
255 	WRITE_LOCALS(MultiSelect);
256 	WRITE_NODE_TYPE("MULTISELECT");
257 
258 	WRITE_NODE_FIELD(selectClauseList);
259 
260 	OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
261 }
262 
263 
264 void
OutMultiTable(OUTFUNC_ARGS)265 OutMultiTable(OUTFUNC_ARGS)
266 {
267 	WRITE_LOCALS(MultiTable);
268 	WRITE_NODE_TYPE("MULTITABLE");
269 
270 	WRITE_OID_FIELD(relationId);
271 	WRITE_INT_FIELD(rangeTableId);
272 
273 	OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
274 }
275 
276 
277 void
OutMultiJoin(OUTFUNC_ARGS)278 OutMultiJoin(OUTFUNC_ARGS)
279 {
280 	WRITE_LOCALS(MultiJoin);
281 	WRITE_NODE_TYPE("MULTIJOIN");
282 
283 	WRITE_NODE_FIELD(joinClauseList);
284 	WRITE_ENUM_FIELD(joinRuleType, JoinRuleType);
285 	WRITE_ENUM_FIELD(joinType, JoinType);
286 
287 	OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
288 }
289 
290 
291 void
OutMultiPartition(OUTFUNC_ARGS)292 OutMultiPartition(OUTFUNC_ARGS)
293 {
294 	WRITE_LOCALS(MultiPartition);
295 	WRITE_NODE_TYPE("MULTIPARTITION");
296 
297 	WRITE_NODE_FIELD(partitionColumn);
298 
299 	OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
300 }
301 
302 
303 void
OutMultiCartesianProduct(OUTFUNC_ARGS)304 OutMultiCartesianProduct(OUTFUNC_ARGS)
305 {
306 	WRITE_LOCALS(MultiCartesianProduct);
307 	WRITE_NODE_TYPE("MULTICARTESIANPRODUCT");
308 
309 	OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
310 }
311 
312 
313 
314 
315 void
OutMultiExtendedOp(OUTFUNC_ARGS)316 OutMultiExtendedOp(OUTFUNC_ARGS)
317 {
318 	WRITE_LOCALS(MultiExtendedOp);
319 	WRITE_NODE_TYPE("MULTIEXTENDEDOP");
320 
321 	WRITE_NODE_FIELD(targetList);
322 	WRITE_NODE_FIELD(groupClauseList);
323 	WRITE_NODE_FIELD(sortClauseList);
324 	WRITE_NODE_FIELD(limitCount);
325 	WRITE_NODE_FIELD(limitOffset);
326 #if PG_VERSION_NUM >= PG_VERSION_13
327 	WRITE_ENUM_FIELD(limitOption, LimitOption);
328 #endif
329 	WRITE_NODE_FIELD(havingQual);
330 	WRITE_BOOL_FIELD(hasDistinctOn);
331 	WRITE_NODE_FIELD(distinctClause);
332 	WRITE_BOOL_FIELD(hasWindowFuncs);
333 	WRITE_BOOL_FIELD(onlyPushableWindowFunctions);
334 	WRITE_NODE_FIELD(windowClause);
335 
336 	OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
337 }
338 
339 static void
OutJobFields(StringInfo str,const Job * node)340 OutJobFields(StringInfo str, const Job *node)
341 {
342 	WRITE_UINT64_FIELD(jobId);
343 	WRITE_NODE_FIELD(jobQuery);
344 	WRITE_NODE_FIELD(taskList);
345 	WRITE_NODE_FIELD(dependentJobList);
346 	WRITE_BOOL_FIELD(subqueryPushdown);
347 	WRITE_BOOL_FIELD(requiresCoordinatorEvaluation);
348 	WRITE_BOOL_FIELD(deferredPruning);
349 	WRITE_NODE_FIELD(partitionKeyValue);
350 	WRITE_NODE_FIELD(localPlannedStatements);
351 	WRITE_BOOL_FIELD(parametersInJobQueryResolved);
352 }
353 
354 
355 void
OutJob(OUTFUNC_ARGS)356 OutJob(OUTFUNC_ARGS)
357 {
358 	WRITE_LOCALS(Job);
359 	WRITE_NODE_TYPE("JOB");
360 
361 	OutJobFields(str, node);
362 }
363 
364 
365 void
OutShardInterval(OUTFUNC_ARGS)366 OutShardInterval(OUTFUNC_ARGS)
367 {
368 	WRITE_LOCALS(ShardInterval);
369 	WRITE_NODE_TYPE("SHARDINTERVAL");
370 
371 	WRITE_OID_FIELD(relationId);
372 	WRITE_CHAR_FIELD(storageType);
373 	WRITE_OID_FIELD(valueTypeId);
374 	WRITE_INT_FIELD(valueTypeLen);
375 	WRITE_BOOL_FIELD(valueByVal);
376 	WRITE_BOOL_FIELD(minValueExists);
377 	WRITE_BOOL_FIELD(maxValueExists);
378 
379 	appendStringInfoString(str, " :minValue ");
380 	if (!node->minValueExists)
381 		appendStringInfoString(str, "<>");
382 	else
383 		outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
384 
385 	appendStringInfoString(str, " :maxValue ");
386 	if (!node->maxValueExists)
387 		appendStringInfoString(str, "<>");
388 	else
389 		outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
390 
391 	WRITE_UINT64_FIELD(shardId);
392 	WRITE_INT_FIELD(shardIndex);
393 }
394 
395 
396 void
OutMapMergeJob(OUTFUNC_ARGS)397 OutMapMergeJob(OUTFUNC_ARGS)
398 {
399 	WRITE_LOCALS(MapMergeJob);
400 	int arrayLength = node->sortedShardIntervalArrayLength;
401 	int i;
402 
403 	WRITE_NODE_TYPE("MAPMERGEJOB");
404 
405 	OutJobFields(str, (Job *) node);
406 	WRITE_NODE_FIELD(reduceQuery);
407 	WRITE_ENUM_FIELD(partitionType, PartitionType);
408 	WRITE_NODE_FIELD(partitionColumn);
409 	WRITE_UINT_FIELD(partitionCount);
410 	WRITE_INT_FIELD(sortedShardIntervalArrayLength);
411 
412 	for (i = 0; i < arrayLength; ++i)
413 	{
414 		outNode(str, node->sortedShardIntervalArray[i]);
415 	}
416 
417 	WRITE_NODE_FIELD(mapTaskList);
418 	WRITE_NODE_FIELD(mergeTaskList);
419 }
420 
421 
422 void
OutShardPlacement(OUTFUNC_ARGS)423 OutShardPlacement(OUTFUNC_ARGS)
424 {
425 	WRITE_LOCALS(ShardPlacement);
426 	WRITE_NODE_TYPE("SHARDPLACEMENT");
427 
428 	WRITE_UINT64_FIELD(placementId);
429 	WRITE_UINT64_FIELD(shardId);
430 	WRITE_UINT64_FIELD(shardLength);
431 	WRITE_ENUM_FIELD(shardState, ShardState);
432 	WRITE_INT_FIELD(groupId);
433 	WRITE_STRING_FIELD(nodeName);
434 	WRITE_UINT_FIELD(nodePort);
435 	WRITE_UINT_FIELD(nodeId);
436 	/* so we can deal with 0 */
437 	WRITE_INT_FIELD(partitionMethod);
438 	WRITE_UINT_FIELD(colocationGroupId);
439 	WRITE_UINT_FIELD(representativeValue);
440 }
441 
442 
443 void
OutGroupShardPlacement(OUTFUNC_ARGS)444 OutGroupShardPlacement(OUTFUNC_ARGS)
445 {
446 	WRITE_LOCALS(GroupShardPlacement);
447 	WRITE_NODE_TYPE("GROUPSHARDPLACEMENT");
448 
449 	WRITE_UINT64_FIELD(placementId);
450 	WRITE_UINT64_FIELD(shardId);
451 	WRITE_UINT64_FIELD(shardLength);
452 	WRITE_ENUM_FIELD(shardState, ShardState);
453 	WRITE_INT_FIELD(groupId);
454 }
455 
456 
457 void
OutRelationShard(OUTFUNC_ARGS)458 OutRelationShard(OUTFUNC_ARGS)
459 {
460 	WRITE_LOCALS(RelationShard);
461 	WRITE_NODE_TYPE("RELATIONSHARD");
462 
463 	WRITE_OID_FIELD(relationId);
464 	WRITE_UINT64_FIELD(shardId);
465 }
466 
467 
468 void
OutRelationRowLock(OUTFUNC_ARGS)469 OutRelationRowLock(OUTFUNC_ARGS)
470 {
471 	WRITE_LOCALS(RelationRowLock);
472 	WRITE_NODE_TYPE("RELATIONROWLOCK");
473 
474 	WRITE_OID_FIELD(relationId);
475 	WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength);
476 }
477 
WriteTaskQuery(OUTFUNC_ARGS)478 static void WriteTaskQuery(OUTFUNC_ARGS) {
479 	WRITE_LOCALS(Task);
480 
481 	WRITE_ENUM_FIELD(taskQuery.queryType, TaskQueryType);
482 
483 	switch (node->taskQuery.queryType)
484 	{
485 		case TASK_QUERY_TEXT:
486 		{
487 			WRITE_STRING_FIELD(taskQuery.data.queryStringLazy);
488 			break;
489 		}
490 
491 		case TASK_QUERY_OBJECT:
492 		{
493 			WRITE_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
494 			break;
495 		}
496 
497 		case TASK_QUERY_TEXT_LIST:
498 		{
499 			WRITE_NODE_FIELD(taskQuery.data.queryStringList);
500 			break;
501 		}
502 
503 		default:
504 		{
505 			break;
506 		}
507 	}
508 }
509 
510 void
OutTask(OUTFUNC_ARGS)511 OutTask(OUTFUNC_ARGS)
512 {
513 	WRITE_LOCALS(Task);
514 	WRITE_NODE_TYPE("TASK");
515 
516 	WRITE_ENUM_FIELD(taskType, TaskType);
517 	WRITE_UINT64_FIELD(jobId);
518 	WRITE_UINT_FIELD(taskId);
519 	WriteTaskQuery(str, raw_node);
520 	WRITE_OID_FIELD(anchorDistributedTableId);
521 	WRITE_UINT64_FIELD(anchorShardId);
522 	WRITE_NODE_FIELD(taskPlacementList);
523 	WRITE_NODE_FIELD(dependentTaskList);
524 	WRITE_UINT_FIELD(partitionId);
525 	WRITE_UINT_FIELD(upstreamTaskId);
526 	WRITE_NODE_FIELD(shardInterval);
527 	WRITE_BOOL_FIELD(assignmentConstrained);
528 	WRITE_CHAR_FIELD(replicationModel);
529 	WRITE_BOOL_FIELD(modifyWithSubquery);
530 	WRITE_NODE_FIELD(relationShardList);
531 	WRITE_NODE_FIELD(relationRowLockList);
532 	WRITE_NODE_FIELD(rowValuesLists);
533 	WRITE_BOOL_FIELD(partiallyLocalOrRemote);
534 	WRITE_BOOL_FIELD(parametersInQueryStringResolved);
535 	WRITE_INT_FIELD(queryCount);
536 	WRITE_UINT64_FIELD(totalReceivedTupleData);
537 	WRITE_INT_FIELD(fetchedExplainAnalyzePlacementIndex);
538 	WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
539 	WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
540 	WRITE_BOOL_FIELD(isLocalTableModification);
541 }
542 
543 
544 void
OutLocalPlannedStatement(OUTFUNC_ARGS)545 OutLocalPlannedStatement(OUTFUNC_ARGS)
546 {
547 	WRITE_LOCALS(LocalPlannedStatement);
548 
549 	WRITE_NODE_TYPE("LocalPlannedStatement");
550 
551 	WRITE_UINT64_FIELD(shardId);
552 	WRITE_UINT_FIELD(localGroupId);
553 	WRITE_NODE_FIELD(localPlan);
554 }
555 
556 void
OutDeferredErrorMessage(OUTFUNC_ARGS)557 OutDeferredErrorMessage(OUTFUNC_ARGS)
558 {
559 	WRITE_LOCALS(DeferredErrorMessage);
560 	WRITE_NODE_TYPE("DEFERREDERRORMESSAGE");
561 
562 	WRITE_INT_FIELD(code);
563 	WRITE_STRING_FIELD(message);
564 	WRITE_STRING_FIELD(detail);
565 	WRITE_STRING_FIELD(hint);
566 	WRITE_STRING_FIELD(filename);
567 	WRITE_INT_FIELD(linenumber);
568 	WRITE_STRING_FIELD(functionname);
569 }
570 
571 
572 void
OutTableDDLCommand(OUTFUNC_ARGS)573 OutTableDDLCommand(OUTFUNC_ARGS)
574 {
575 	WRITE_LOCALS(TableDDLCommand);
576 	WRITE_NODE_TYPE("TableDDLCommand");
577 
578 	switch (node->type)
579 	{
580 		case TABLE_DDL_COMMAND_STRING:
581 		{
582 			WRITE_STRING_FIELD(commandStr);
583 			break;
584 		}
585 
586 		case TABLE_DDL_COMMAND_FUNCTION:
587 		{
588 			char *example = node->function.function(node->function.context);
589 			WRITE_CUSTOM_FIELD(function, example);
590 			break;
591 		}
592 	}
593 }
594