1 /*-------------------------------------------------------------------------
2 *
3 * citus_outfuncs.c
4 * Output functions for Citus tree nodes.
5 *
6 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 * Portions Copyright (c) Citus Data, Inc.
9 *
10 * NOTES
11 * This is a wrapper around postgres' nodeToString() that additionally
12 * supports Citus node types.
13 *
14 * Keep as closely aligned with the upstream version as possible.
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "distributed/pg_version_constants.h"
22
23 #include <ctype.h>
24
25 #include "distributed/citus_nodefuncs.h"
26 #include "distributed/citus_nodes.h"
27 #include "distributed/coordinator_protocol.h"
28 #include "distributed/errormessage.h"
29 #include "distributed/log_utils.h"
30 #include "distributed/multi_logical_planner.h"
31 #include "distributed/multi_physical_planner.h"
32 #include "distributed/distributed_planner.h"
33 #include "distributed/multi_server_executor.h"
34 #include "distributed/metadata_utility.h"
35 #include "lib/stringinfo.h"
36 #include "nodes/plannodes.h"
37 #include "nodes/pathnodes.h"
38 #include "utils/datum.h"
39
40
41 /*
42 * Macros to simplify output of different kinds of fields. Use these
43 * wherever possible to reduce the chance for silly typos. Note that these
44 * hard-wire conventions about the names of the local variables in an Out
45 * routine.
46 */
47
48 /* Store const reference to raw input node in local named 'node' */
49 #define WRITE_LOCALS(nodeTypeName) \
50 const nodeTypeName *node = (const nodeTypeName *) raw_node
51
52 /* Write the label for the node type */
53 #define WRITE_NODE_TYPE(nodelabel) \
54 (void) 0
55
56 /* Write an integer field (anything written as ":fldname %d") */
57 #define WRITE_INT_FIELD(fldname) \
58 appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
59
60 /* Write an 64-bit integer field (anything written as ":fldname %d") */
61 #define WRITE_INT64_FIELD(fldname) \
62 appendStringInfo(str, " :" CppAsString(fldname) " " INT64_FORMAT, node->fldname)
63
64
65 /* Write an unsigned integer field (anything written as ":fldname %u") */
66 #define WRITE_UINT_FIELD(fldname) \
67 appendStringInfo(str, " :" CppAsString(fldname) " %u", node->fldname)
68
69 /* XXX: Citus: Write an unsigned 64-bit integer field */
70 #define WRITE_UINT64_FIELD(fldname) \
71 appendStringInfo(str, " :" CppAsString(fldname) " " UINT64_FORMAT, node->fldname)
72
73 /* Write an OID field (don't hard-wire assumption that OID is same as uint) */
74 #define WRITE_OID_FIELD(fldname) \
75 appendStringInfo(str, " :" CppAsString(fldname) " %u", node->fldname)
76
77 /* Write a char field (ie, one ascii character) */
78 #define WRITE_CHAR_FIELD(fldname) \
79 appendStringInfo(str, " :" CppAsString(fldname) " %c", node->fldname)
80
81 /* Write an enumerated-type field as an integer code */
82 #define WRITE_ENUM_FIELD(fldname, enumtype) \
83 appendStringInfo(str, " :" CppAsString(fldname) " %d", \
84 (int) node->fldname)
85
86 /* Write a float field --- caller must give format to define precision */
87 #define WRITE_FLOAT_FIELD(fldname,format) \
88 appendStringInfo(str, " :" CppAsString(fldname) " " format, node->fldname)
89
90 /* Write a boolean field */
91 #define WRITE_BOOL_FIELD(fldname) \
92 appendStringInfo(str, " :" CppAsString(fldname) " %s", \
93 booltostr(node->fldname))
94
95 /* Write a character-string (possibly NULL) field */
96 #define WRITE_STRING_FIELD(fldname) \
97 (appendStringInfo(str, " :" CppAsString(fldname) " "), \
98 outToken(str, node->fldname))
99
100 /* Write a parse location field (actually same as INT case) */
101 #define WRITE_LOCATION_FIELD(fldname) \
102 appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
103
104 /* Write a Node field */
105 #define WRITE_NODE_FIELD(fldname) \
106 (appendStringInfo(str, " :" CppAsString(fldname) " "), \
107 outNode(str, node->fldname))
108
109 /* Write a bitmapset field */
110 #define WRITE_BITMAPSET_FIELD(fldname) \
111 (appendStringInfo(str, " :" CppAsString(fldname) " "), \
112 _outBitmapset(str, node->fldname))
113
114 #define WRITE_CUSTOM_FIELD(fldname, fldvalue) \
115 (appendStringInfo(str, " :" CppAsString(fldname) " "), \
116 appendStringInfoString(str, (fldvalue)))
117
118
119 /* Write an integer array (anything written as ":fldname (%d, %d") */
120 #define WRITE_INT_ARRAY(fldname, count) \
121 appendStringInfo(str, " :" CppAsString(fldname) " ("); \
122 { \
123 int i;\
124 for (i = 0; i < count; i++) \
125 { \
126 if (i > 0) \
127 { \
128 appendStringInfo(str, ", "); \
129 } \
130 appendStringInfo(str, "%d", node->fldname[i]); \
131 }\
132 }\
133 appendStringInfo(str, ")")
134
135
136 /* Write an enum array (anything written as ":fldname (%d, %d") */
137 #define WRITE_ENUM_ARRAY(fldname, count) WRITE_INT_ARRAY(fldname, count)
138
139
140 #define booltostr(x) ((x) ? "true" : "false")
141 static void WriteTaskQuery(OUTFUNC_ARGS);
142
143 /*****************************************************************************
144 * Output routines for Citus node types
145 *****************************************************************************/
146
147 static void
OutMultiUnaryNodeFields(StringInfo str,const MultiUnaryNode * node)148 OutMultiUnaryNodeFields(StringInfo str, const MultiUnaryNode *node)
149 {
150 WRITE_NODE_FIELD(childNode);
151 }
152
153
154 static void
OutMultiBinaryNodeFields(StringInfo str,const MultiBinaryNode * node)155 OutMultiBinaryNodeFields(StringInfo str, const MultiBinaryNode *node)
156 {
157 WRITE_NODE_FIELD(leftChildNode);
158 WRITE_NODE_FIELD(rightChildNode);
159 }
160
161 void
OutMultiNode(OUTFUNC_ARGS)162 OutMultiNode(OUTFUNC_ARGS)
163 {
164 WRITE_NODE_TYPE("MULTINODE");
165 }
166
167
168 void
OutMultiTreeRoot(OUTFUNC_ARGS)169 OutMultiTreeRoot(OUTFUNC_ARGS)
170 {
171 WRITE_LOCALS(MultiTreeRoot);
172
173 WRITE_NODE_TYPE("MULTITREEROOT");
174
175 OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
176 }
177
178
179 void
OutDistributedPlan(OUTFUNC_ARGS)180 OutDistributedPlan(OUTFUNC_ARGS)
181 {
182 WRITE_LOCALS(DistributedPlan);
183
184 WRITE_NODE_TYPE("DISTRIBUTEDPLAN");
185
186 WRITE_UINT64_FIELD(planId);
187 WRITE_ENUM_FIELD(modLevel, RowModifyLevel);
188 WRITE_BOOL_FIELD(expectResults);
189
190 WRITE_NODE_FIELD(workerJob);
191 WRITE_NODE_FIELD(combineQuery);
192 WRITE_UINT64_FIELD(queryId);
193 WRITE_NODE_FIELD(relationIdList);
194 WRITE_OID_FIELD(targetRelationId);
195 WRITE_NODE_FIELD(insertSelectQuery);
196 WRITE_STRING_FIELD(intermediateResultIdPrefix);
197
198 WRITE_NODE_FIELD(subPlanList);
199 WRITE_NODE_FIELD(usedSubPlanNodeList);
200 WRITE_BOOL_FIELD(fastPathRouterPlan);
201 WRITE_UINT_FIELD(numberOfTimesExecuted);
202
203 WRITE_NODE_FIELD(planningError);
204 }
205
206
207 void
OutDistributedSubPlan(OUTFUNC_ARGS)208 OutDistributedSubPlan(OUTFUNC_ARGS)
209 {
210 WRITE_LOCALS(DistributedSubPlan);
211
212 WRITE_NODE_TYPE("DISTRIBUTEDSUBPLAN");
213
214 WRITE_UINT_FIELD(subPlanId);
215 WRITE_NODE_FIELD(plan);
216 }
217
218 void
OutUsedDistributedSubPlan(OUTFUNC_ARGS)219 OutUsedDistributedSubPlan(OUTFUNC_ARGS)
220 {
221 WRITE_LOCALS(UsedDistributedSubPlan);
222
223 WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN");
224
225 WRITE_STRING_FIELD(subPlanId);
226 WRITE_ENUM_FIELD(accessType, SubPlanAccessType);
227 }
228
229
230 void
OutMultiProject(OUTFUNC_ARGS)231 OutMultiProject(OUTFUNC_ARGS)
232 {
233 WRITE_LOCALS(MultiProject);
234 WRITE_NODE_TYPE("MULTIPROJECT");
235
236 WRITE_NODE_FIELD(columnList);
237
238 OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
239 }
240
241
242 void
OutMultiCollect(OUTFUNC_ARGS)243 OutMultiCollect(OUTFUNC_ARGS)
244 {
245 WRITE_LOCALS(MultiCollect);
246 WRITE_NODE_TYPE("MULTICOLLECT");
247
248 OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
249 }
250
251
252 void
OutMultiSelect(OUTFUNC_ARGS)253 OutMultiSelect(OUTFUNC_ARGS)
254 {
255 WRITE_LOCALS(MultiSelect);
256 WRITE_NODE_TYPE("MULTISELECT");
257
258 WRITE_NODE_FIELD(selectClauseList);
259
260 OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
261 }
262
263
264 void
OutMultiTable(OUTFUNC_ARGS)265 OutMultiTable(OUTFUNC_ARGS)
266 {
267 WRITE_LOCALS(MultiTable);
268 WRITE_NODE_TYPE("MULTITABLE");
269
270 WRITE_OID_FIELD(relationId);
271 WRITE_INT_FIELD(rangeTableId);
272
273 OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
274 }
275
276
277 void
OutMultiJoin(OUTFUNC_ARGS)278 OutMultiJoin(OUTFUNC_ARGS)
279 {
280 WRITE_LOCALS(MultiJoin);
281 WRITE_NODE_TYPE("MULTIJOIN");
282
283 WRITE_NODE_FIELD(joinClauseList);
284 WRITE_ENUM_FIELD(joinRuleType, JoinRuleType);
285 WRITE_ENUM_FIELD(joinType, JoinType);
286
287 OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
288 }
289
290
291 void
OutMultiPartition(OUTFUNC_ARGS)292 OutMultiPartition(OUTFUNC_ARGS)
293 {
294 WRITE_LOCALS(MultiPartition);
295 WRITE_NODE_TYPE("MULTIPARTITION");
296
297 WRITE_NODE_FIELD(partitionColumn);
298
299 OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
300 }
301
302
303 void
OutMultiCartesianProduct(OUTFUNC_ARGS)304 OutMultiCartesianProduct(OUTFUNC_ARGS)
305 {
306 WRITE_LOCALS(MultiCartesianProduct);
307 WRITE_NODE_TYPE("MULTICARTESIANPRODUCT");
308
309 OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
310 }
311
312
313
314
315 void
OutMultiExtendedOp(OUTFUNC_ARGS)316 OutMultiExtendedOp(OUTFUNC_ARGS)
317 {
318 WRITE_LOCALS(MultiExtendedOp);
319 WRITE_NODE_TYPE("MULTIEXTENDEDOP");
320
321 WRITE_NODE_FIELD(targetList);
322 WRITE_NODE_FIELD(groupClauseList);
323 WRITE_NODE_FIELD(sortClauseList);
324 WRITE_NODE_FIELD(limitCount);
325 WRITE_NODE_FIELD(limitOffset);
326 #if PG_VERSION_NUM >= PG_VERSION_13
327 WRITE_ENUM_FIELD(limitOption, LimitOption);
328 #endif
329 WRITE_NODE_FIELD(havingQual);
330 WRITE_BOOL_FIELD(hasDistinctOn);
331 WRITE_NODE_FIELD(distinctClause);
332 WRITE_BOOL_FIELD(hasWindowFuncs);
333 WRITE_BOOL_FIELD(onlyPushableWindowFunctions);
334 WRITE_NODE_FIELD(windowClause);
335
336 OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
337 }
338
339 static void
OutJobFields(StringInfo str,const Job * node)340 OutJobFields(StringInfo str, const Job *node)
341 {
342 WRITE_UINT64_FIELD(jobId);
343 WRITE_NODE_FIELD(jobQuery);
344 WRITE_NODE_FIELD(taskList);
345 WRITE_NODE_FIELD(dependentJobList);
346 WRITE_BOOL_FIELD(subqueryPushdown);
347 WRITE_BOOL_FIELD(requiresCoordinatorEvaluation);
348 WRITE_BOOL_FIELD(deferredPruning);
349 WRITE_NODE_FIELD(partitionKeyValue);
350 WRITE_NODE_FIELD(localPlannedStatements);
351 WRITE_BOOL_FIELD(parametersInJobQueryResolved);
352 }
353
354
355 void
OutJob(OUTFUNC_ARGS)356 OutJob(OUTFUNC_ARGS)
357 {
358 WRITE_LOCALS(Job);
359 WRITE_NODE_TYPE("JOB");
360
361 OutJobFields(str, node);
362 }
363
364
365 void
OutShardInterval(OUTFUNC_ARGS)366 OutShardInterval(OUTFUNC_ARGS)
367 {
368 WRITE_LOCALS(ShardInterval);
369 WRITE_NODE_TYPE("SHARDINTERVAL");
370
371 WRITE_OID_FIELD(relationId);
372 WRITE_CHAR_FIELD(storageType);
373 WRITE_OID_FIELD(valueTypeId);
374 WRITE_INT_FIELD(valueTypeLen);
375 WRITE_BOOL_FIELD(valueByVal);
376 WRITE_BOOL_FIELD(minValueExists);
377 WRITE_BOOL_FIELD(maxValueExists);
378
379 appendStringInfoString(str, " :minValue ");
380 if (!node->minValueExists)
381 appendStringInfoString(str, "<>");
382 else
383 outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
384
385 appendStringInfoString(str, " :maxValue ");
386 if (!node->maxValueExists)
387 appendStringInfoString(str, "<>");
388 else
389 outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
390
391 WRITE_UINT64_FIELD(shardId);
392 WRITE_INT_FIELD(shardIndex);
393 }
394
395
396 void
OutMapMergeJob(OUTFUNC_ARGS)397 OutMapMergeJob(OUTFUNC_ARGS)
398 {
399 WRITE_LOCALS(MapMergeJob);
400 int arrayLength = node->sortedShardIntervalArrayLength;
401 int i;
402
403 WRITE_NODE_TYPE("MAPMERGEJOB");
404
405 OutJobFields(str, (Job *) node);
406 WRITE_NODE_FIELD(reduceQuery);
407 WRITE_ENUM_FIELD(partitionType, PartitionType);
408 WRITE_NODE_FIELD(partitionColumn);
409 WRITE_UINT_FIELD(partitionCount);
410 WRITE_INT_FIELD(sortedShardIntervalArrayLength);
411
412 for (i = 0; i < arrayLength; ++i)
413 {
414 outNode(str, node->sortedShardIntervalArray[i]);
415 }
416
417 WRITE_NODE_FIELD(mapTaskList);
418 WRITE_NODE_FIELD(mergeTaskList);
419 }
420
421
422 void
OutShardPlacement(OUTFUNC_ARGS)423 OutShardPlacement(OUTFUNC_ARGS)
424 {
425 WRITE_LOCALS(ShardPlacement);
426 WRITE_NODE_TYPE("SHARDPLACEMENT");
427
428 WRITE_UINT64_FIELD(placementId);
429 WRITE_UINT64_FIELD(shardId);
430 WRITE_UINT64_FIELD(shardLength);
431 WRITE_ENUM_FIELD(shardState, ShardState);
432 WRITE_INT_FIELD(groupId);
433 WRITE_STRING_FIELD(nodeName);
434 WRITE_UINT_FIELD(nodePort);
435 WRITE_UINT_FIELD(nodeId);
436 /* so we can deal with 0 */
437 WRITE_INT_FIELD(partitionMethod);
438 WRITE_UINT_FIELD(colocationGroupId);
439 WRITE_UINT_FIELD(representativeValue);
440 }
441
442
443 void
OutGroupShardPlacement(OUTFUNC_ARGS)444 OutGroupShardPlacement(OUTFUNC_ARGS)
445 {
446 WRITE_LOCALS(GroupShardPlacement);
447 WRITE_NODE_TYPE("GROUPSHARDPLACEMENT");
448
449 WRITE_UINT64_FIELD(placementId);
450 WRITE_UINT64_FIELD(shardId);
451 WRITE_UINT64_FIELD(shardLength);
452 WRITE_ENUM_FIELD(shardState, ShardState);
453 WRITE_INT_FIELD(groupId);
454 }
455
456
457 void
OutRelationShard(OUTFUNC_ARGS)458 OutRelationShard(OUTFUNC_ARGS)
459 {
460 WRITE_LOCALS(RelationShard);
461 WRITE_NODE_TYPE("RELATIONSHARD");
462
463 WRITE_OID_FIELD(relationId);
464 WRITE_UINT64_FIELD(shardId);
465 }
466
467
468 void
OutRelationRowLock(OUTFUNC_ARGS)469 OutRelationRowLock(OUTFUNC_ARGS)
470 {
471 WRITE_LOCALS(RelationRowLock);
472 WRITE_NODE_TYPE("RELATIONROWLOCK");
473
474 WRITE_OID_FIELD(relationId);
475 WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength);
476 }
477
WriteTaskQuery(OUTFUNC_ARGS)478 static void WriteTaskQuery(OUTFUNC_ARGS) {
479 WRITE_LOCALS(Task);
480
481 WRITE_ENUM_FIELD(taskQuery.queryType, TaskQueryType);
482
483 switch (node->taskQuery.queryType)
484 {
485 case TASK_QUERY_TEXT:
486 {
487 WRITE_STRING_FIELD(taskQuery.data.queryStringLazy);
488 break;
489 }
490
491 case TASK_QUERY_OBJECT:
492 {
493 WRITE_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
494 break;
495 }
496
497 case TASK_QUERY_TEXT_LIST:
498 {
499 WRITE_NODE_FIELD(taskQuery.data.queryStringList);
500 break;
501 }
502
503 default:
504 {
505 break;
506 }
507 }
508 }
509
510 void
OutTask(OUTFUNC_ARGS)511 OutTask(OUTFUNC_ARGS)
512 {
513 WRITE_LOCALS(Task);
514 WRITE_NODE_TYPE("TASK");
515
516 WRITE_ENUM_FIELD(taskType, TaskType);
517 WRITE_UINT64_FIELD(jobId);
518 WRITE_UINT_FIELD(taskId);
519 WriteTaskQuery(str, raw_node);
520 WRITE_OID_FIELD(anchorDistributedTableId);
521 WRITE_UINT64_FIELD(anchorShardId);
522 WRITE_NODE_FIELD(taskPlacementList);
523 WRITE_NODE_FIELD(dependentTaskList);
524 WRITE_UINT_FIELD(partitionId);
525 WRITE_UINT_FIELD(upstreamTaskId);
526 WRITE_NODE_FIELD(shardInterval);
527 WRITE_BOOL_FIELD(assignmentConstrained);
528 WRITE_CHAR_FIELD(replicationModel);
529 WRITE_BOOL_FIELD(modifyWithSubquery);
530 WRITE_NODE_FIELD(relationShardList);
531 WRITE_NODE_FIELD(relationRowLockList);
532 WRITE_NODE_FIELD(rowValuesLists);
533 WRITE_BOOL_FIELD(partiallyLocalOrRemote);
534 WRITE_BOOL_FIELD(parametersInQueryStringResolved);
535 WRITE_INT_FIELD(queryCount);
536 WRITE_UINT64_FIELD(totalReceivedTupleData);
537 WRITE_INT_FIELD(fetchedExplainAnalyzePlacementIndex);
538 WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
539 WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
540 WRITE_BOOL_FIELD(isLocalTableModification);
541 }
542
543
544 void
OutLocalPlannedStatement(OUTFUNC_ARGS)545 OutLocalPlannedStatement(OUTFUNC_ARGS)
546 {
547 WRITE_LOCALS(LocalPlannedStatement);
548
549 WRITE_NODE_TYPE("LocalPlannedStatement");
550
551 WRITE_UINT64_FIELD(shardId);
552 WRITE_UINT_FIELD(localGroupId);
553 WRITE_NODE_FIELD(localPlan);
554 }
555
556 void
OutDeferredErrorMessage(OUTFUNC_ARGS)557 OutDeferredErrorMessage(OUTFUNC_ARGS)
558 {
559 WRITE_LOCALS(DeferredErrorMessage);
560 WRITE_NODE_TYPE("DEFERREDERRORMESSAGE");
561
562 WRITE_INT_FIELD(code);
563 WRITE_STRING_FIELD(message);
564 WRITE_STRING_FIELD(detail);
565 WRITE_STRING_FIELD(hint);
566 WRITE_STRING_FIELD(filename);
567 WRITE_INT_FIELD(linenumber);
568 WRITE_STRING_FIELD(functionname);
569 }
570
571
572 void
OutTableDDLCommand(OUTFUNC_ARGS)573 OutTableDDLCommand(OUTFUNC_ARGS)
574 {
575 WRITE_LOCALS(TableDDLCommand);
576 WRITE_NODE_TYPE("TableDDLCommand");
577
578 switch (node->type)
579 {
580 case TABLE_DDL_COMMAND_STRING:
581 {
582 WRITE_STRING_FIELD(commandStr);
583 break;
584 }
585
586 case TABLE_DDL_COMMAND_FUNCTION:
587 {
588 char *example = node->function.function(node->function.context);
589 WRITE_CUSTOM_FIELD(function, example);
590 break;
591 }
592 }
593 }
594