1 /*-------------------------------------------------------------------------
2  *
3  * recursive_planning.c
4  *
5  * Logic for calling the postgres planner recursively for CTEs and
6  * non-pushdownable subqueries in distributed queries.
7  *
8  * PostgreSQL with Citus can execute 4 types of queries:
9  *
10  * - Postgres queries on local tables and functions.
11  *
12  *   These queries can use all SQL features, but they may not reference
13  *   distributed tables.
14  *
15  * - Router queries that can be executed on a single by node by replacing
16  *   table names with shard names.
17  *
18  *   These queries can use nearly all SQL features, but only if they have
19  *   a single-valued filter on the distribution column.
20  *
21  * - Multi-shard queries that can be executed by performing a task for each
22  *   shard in a distributed table and performing a merge step.
23  *
24  *   These queries have limited SQL support. They may only include
25  *   subqueries if the subquery can be executed on each shard by replacing
26  *   table names with shard names and concatenating the result.
27  *
28  *   These queries have very limited SQL support and only support basic
29  *   inner joins and subqueries without joins.
30  *
31  * To work around the limitations of these planners, we recursively call
32  * the planner for CTEs and unsupported subqueries to obtain a list of
33  * subplans.
34  *
35  * During execution, each subplan is executed separately through the method
36  * that is appropriate for that query. The results are written to temporary
37  * files on the workers. In the original query, the CTEs and subqueries are
38  * replaced by mini-subqueries that read from the temporary files.
39  *
40  * This allows almost all SQL to be directly or indirectly supported,
41  * because if all subqueries that contain distributed tables have been
42  * replaced then what remains is a router query which can use nearly all
43  * SQL features.
44  *
45  * Copyright (c) Citus Data, Inc.
46  *-------------------------------------------------------------------------
47  */
48 
49 #include "postgres.h"
50 
51 #include "distributed/pg_version_constants.h"
52 
53 #include "funcapi.h"
54 
55 #include "catalog/pg_type.h"
56 #include "catalog/pg_class.h"
57 #include "distributed/citus_nodes.h"
58 #include "distributed/citus_ruleutils.h"
59 #include "distributed/commands/multi_copy.h"
60 #include "distributed/distributed_planner.h"
61 #include "distributed/errormessage.h"
62 #include "distributed/local_distributed_join_planner.h"
63 #include "distributed/listutils.h"
64 #include "distributed/log_utils.h"
65 #include "distributed/metadata_cache.h"
66 #include "distributed/multi_logical_planner.h"
67 #include "distributed/multi_logical_optimizer.h"
68 #include "distributed/multi_router_planner.h"
69 #include "distributed/multi_physical_planner.h"
70 #include "distributed/multi_server_executor.h"
71 #include "distributed/query_colocation_checker.h"
72 #include "distributed/query_pushdown_planning.h"
73 #include "distributed/recursive_planning.h"
74 #include "distributed/relation_restriction_equivalence.h"
75 #include "distributed/log_utils.h"
76 #include "distributed/shard_pruning.h"
77 #include "distributed/version_compat.h"
78 #include "lib/stringinfo.h"
79 #include "optimizer/clauses.h"
80 #include "optimizer/optimizer.h"
81 #include "optimizer/planner.h"
82 #include "optimizer/prep.h"
83 #include "parser/parsetree.h"
84 #include "nodes/makefuncs.h"
85 #include "nodes/nodeFuncs.h"
86 #include "nodes/nodes.h"
87 #include "nodes/nodeFuncs.h"
88 #include "nodes/pg_list.h"
89 #include "nodes/primnodes.h"
90 #include "nodes/pathnodes.h"
91 #include "utils/builtins.h"
92 #include "utils/guc.h"
93 #include "utils/lsyscache.h"
94 
95 /*
96  * RecursivePlanningContext is used to recursively plan subqueries
97  * and CTEs, pull results to the coordinator, and push it back into
98  * the workers.
99  */
100 struct RecursivePlanningContextInternal
101 {
102 	int level;
103 	uint64 planId;
104 	bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
105 	List *subPlanList;
106 	PlannerRestrictionContext *plannerRestrictionContext;
107 };
108 
109 /* track depth of current recursive planner query */
110 static int recursivePlanningDepth = 0;
111 
112 /*
113  * CteReferenceWalkerContext is used to collect CTE references in
114  * CteReferenceListWalker.
115  */
116 typedef struct CteReferenceWalkerContext
117 {
118 	int level;
119 	List *cteReferenceList;
120 } CteReferenceWalkerContext;
121 
122 /*
123  * VarLevelsUpWalkerContext is used to find Vars in a (sub)query that
124  * refer to upper levels and therefore cannot be planned separately.
125  */
126 typedef struct VarLevelsUpWalkerContext
127 {
128 	int level;
129 } VarLevelsUpWalkerContext;
130 
131 
132 /* local function forward declarations */
133 static DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query,
134 															   RecursivePlanningContext *
135 															   context);
136 static bool ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery,
137 														RecursivePlanningContext *
138 														context);
139 static bool ContainsSubquery(Query *query);
140 static void RecursivelyPlanNonColocatedSubqueries(Query *subquery,
141 												  RecursivePlanningContext *context);
142 static void RecursivelyPlanNonColocatedJoinWalker(Node *joinNode,
143 												  ColocatedJoinChecker *
144 												  colocatedJoinChecker,
145 												  RecursivePlanningContext *
146 												  recursivePlanningContext);
147 static void RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
148 														 ColocatedJoinChecker *
149 														 colocatedJoinChecker,
150 														 RecursivePlanningContext *
151 														 recursivePlanningContext);
152 static List * SublinkListFromWhere(Query *originalQuery);
153 static bool ExtractSublinkWalker(Node *node, List **sublinkList);
154 static bool ShouldRecursivelyPlanSublinks(Query *query);
155 static bool RecursivelyPlanAllSubqueries(Node *node,
156 										 RecursivePlanningContext *planningContext);
157 static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query,
158 												  RecursivePlanningContext *context);
159 static bool RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context);
160 static bool ShouldRecursivelyPlanSubquery(Query *subquery,
161 										  RecursivePlanningContext *context);
162 static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery,
163 												  PlannerRestrictionContext *
164 												  restrictionContext);
165 static bool ShouldRecursivelyPlanSetOperation(Query *query,
166 											  RecursivePlanningContext *context);
167 static bool RecursivelyPlanSubquery(Query *subquery,
168 									RecursivePlanningContext *planningContext);
169 static void RecursivelyPlanSetOperations(Query *query, Node *node,
170 										 RecursivePlanningContext *context);
171 static bool IsLocalTableRteOrMatView(Node *node);
172 static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId,
173 													 Query *subPlanQuery);
174 static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context);
175 static bool ContainsReferencesToOuterQueryWalker(Node *node,
176 												 VarLevelsUpWalkerContext *context);
177 static bool NodeContainsSubqueryReferencingOuterQuery(Node *node);
178 static void WrapFunctionsInSubqueries(Query *query);
179 static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry);
180 static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry);
181 static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
182 												 List *columnAliasList,
183 												 Const *resultIdConst, Oid functionOid,
184 												 bool useBinaryCopyFormat);
185 static void UpdateVarNosInNode(Node *node, Index newVarNo);
186 static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry,
187 								   List *outerSubqueryTargetList);
188 static List * GenerateRequiredColNamesFromTargetList(List *targetList);
189 static char * GetRelationNameAndAliasName(RangeTblEntry *rangeTablentry);
190 
191 /*
192  * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
193  * The function returns the subplans if necessary. For the details of when/how subplans are
194  * generated, see RecursivelyPlanSubqueriesAndCTEs().
195  *
196  * Note that the input originalQuery query is modified if any subplans are generated.
197  */
198 List *
GenerateSubplansForSubqueriesAndCTEs(uint64 planId,Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext)199 GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
200 									 PlannerRestrictionContext *plannerRestrictionContext)
201 {
202 	RecursivePlanningContext context;
203 
204 	recursivePlanningDepth++;
205 
206 	/*
207 	 * Plan subqueries and CTEs that cannot be pushed down by recursively
208 	 * calling the planner and add the resulting plans to subPlanList.
209 	 */
210 	context.level = 0;
211 	context.planId = planId;
212 	context.subPlanList = NIL;
213 	context.plannerRestrictionContext = plannerRestrictionContext;
214 
215 	/*
216 	 * Calculating the distribution key equality upfront is a trade-off for us.
217 	 *
218 	 * When the originalQuery contains the distribution key equality, we'd be
219 	 * able to skip further checks for each lower level subqueries (i.e., if the
220 	 * all query contains distribution key equality, each subquery also contains
221 	 * distribution key equality.)
222 	 *
223 	 * When the originalQuery doesn't contain the distribution key equality,
224 	 * calculating this wouldn't help us at all, we should individually check
225 	 * each each subquery and subquery joins among subqueries.
226 	 */
227 	context.allDistributionKeysInQueryAreEqual =
228 		AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);
229 
230 	DeferredErrorMessage *error = RecursivelyPlanSubqueriesAndCTEs(originalQuery,
231 																   &context);
232 	if (error != NULL)
233 	{
234 		recursivePlanningDepth--;
235 		RaiseDeferredError(error, ERROR);
236 	}
237 
238 	if (context.subPlanList && IsLoggableLevel(DEBUG1))
239 	{
240 		StringInfo subPlanString = makeStringInfo();
241 		pg_get_query_def(originalQuery, subPlanString);
242 		ereport(DEBUG1, (errmsg(
243 							 "Plan " UINT64_FORMAT
244 							 " query after replacing subqueries and CTEs: %s", planId,
245 							 ApplyLogRedaction(subPlanString->data))));
246 	}
247 
248 	recursivePlanningDepth--;
249 
250 	return context.subPlanList;
251 }
252 
253 
254 /*
255  * RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to
256  * workers directly and instead plans them by recursively calling the planner and
257  * adding the subplan to subPlanList.
258  *
259  * Subplans are executed prior to the distributed plan and the results are written
260  * to temporary files on workers.
261  *
262  * CTE references are replaced by a subquery on the read_intermediate_result
263  * function, which reads from the temporary file.
264  *
265  * If recursive planning results in an error then the error is returned. Otherwise, the
266  * subplans will be added to subPlanList.
267  */
268 static DeferredErrorMessage *
RecursivelyPlanSubqueriesAndCTEs(Query * query,RecursivePlanningContext * context)269 RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context)
270 {
271 	DeferredErrorMessage *error = RecursivelyPlanCTEs(query, context);
272 	if (error != NULL)
273 	{
274 		return error;
275 	}
276 
277 	if (SubqueryPushdown)
278 	{
279 		/*
280 		 * When the subquery_pushdown flag is enabled we make some hacks
281 		 * to push down subqueries with LIMIT. Recursive planning would
282 		 * valiantly do the right thing and try to recursively plan the
283 		 * inner subqueries, but we don't really want it to because those
284 		 * subqueries might not be supported and would be much slower.
285 		 *
286 		 * Instead, we skip recursive planning altogether when
287 		 * subquery_pushdown is enabled.
288 		 */
289 		return NULL;
290 	}
291 
292 	/* make sure function calls in joins are executed in the coordinator */
293 	WrapFunctionsInSubqueries(query);
294 
295 	/* descend into subqueries */
296 	query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0);
297 
298 	/*
299 	 * At this point, all CTEs, leaf subqueries containing local tables and
300 	 * non-pushdownable subqueries have been replaced. We now check for
301 	 * combinations of subqueries that cannot be pushed down (e.g.
302 	 * <subquery on reference table> UNION <subquery on distributed table>).
303 	 *
304 	 * This code also runs for the top-level query, which allows us to support
305 	 * top-level set operations.
306 	 */
307 
308 	if (ShouldRecursivelyPlanSetOperation(query, context))
309 	{
310 		RecursivelyPlanSetOperations(query, (Node *) query->setOperations, context);
311 	}
312 
313 	/*
314 	 * If the FROM clause is recurring (does not contain a distributed table),
315 	 * then we cannot have any distributed tables appearing in subqueries in
316 	 * the SELECT and WHERE clauses.
317 	 */
318 	if (ShouldRecursivelyPlanSublinks(query))
319 	{
320 		/* replace all subqueries in the WHERE clause */
321 		if (query->jointree && query->jointree->quals)
322 		{
323 			RecursivelyPlanAllSubqueries((Node *) query->jointree->quals, context);
324 		}
325 
326 		/* replace all subqueries in the SELECT clause */
327 		RecursivelyPlanAllSubqueries((Node *) query->targetList, context);
328 	}
329 
330 	if (query->havingQual != NULL)
331 	{
332 		if (NodeContainsSubqueryReferencingOuterQuery(query->havingQual))
333 		{
334 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
335 								 "Subqueries in HAVING cannot refer to outer query",
336 								 NULL, NULL);
337 		}
338 
339 		RecursivelyPlanAllSubqueries(query->havingQual, context);
340 	}
341 
342 	/*
343 	 * If the query doesn't have distribution key equality,
344 	 * recursively plan some of its subqueries.
345 	 */
346 	if (ShouldRecursivelyPlanNonColocatedSubqueries(query, context))
347 	{
348 		RecursivelyPlanNonColocatedSubqueries(query, context);
349 	}
350 
351 
352 	if (ShouldConvertLocalTableJoinsToSubqueries(query->rtable))
353 	{
354 		/*
355 		 * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
356 		 * a query with local table/citus local table and subquery. We convert local/citus local
357 		 * tables to a subquery until they can be planned.
358 		 */
359 		RecursivelyPlanLocalTableJoins(query, context);
360 	}
361 
362 
363 	return NULL;
364 }
365 
366 
367 /*
368  * GetPlannerRestrictionContext returns the planner restriction context
369  * from the given context.
370  */
371 PlannerRestrictionContext *
GetPlannerRestrictionContext(RecursivePlanningContext * recursivePlanningContext)372 GetPlannerRestrictionContext(RecursivePlanningContext *recursivePlanningContext)
373 {
374 	return recursivePlanningContext->plannerRestrictionContext;
375 }
376 
377 
378 /*
379  * ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains joins
380  * that are not on the distribution key.
381  * *
382  * Note that at the point that this function is called, we've already recursively planned all
383  * the leaf subqueries. Thus, we're actually checking whether the joins among the subqueries
384  * on the distribution key or not.
385  */
386 static bool
ShouldRecursivelyPlanNonColocatedSubqueries(Query * subquery,RecursivePlanningContext * context)387 ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery,
388 											RecursivePlanningContext *context)
389 {
390 	/*
391 	 * If the input query already contains the equality, simply return since it is not
392 	 * possible to find any non colocated subqueries.
393 	 */
394 	if (context->allDistributionKeysInQueryAreEqual)
395 	{
396 		return false;
397 	}
398 
399 	/*
400 	 * This check helps us in two ways:
401 	 *   (i) We're not targeting queries that don't include subqueries at all,
402 	 *       they should go through regular planning.
403 	 *  (ii) Lower level subqueries are already recursively planned, so we should
404 	 *       only bother non-colocated subquery joins, which only happens when
405 	 *       there are subqueries.
406 	 */
407 	if (!ContainsSubquery(subquery))
408 	{
409 		return false;
410 	}
411 
412 	/* direct joins with local tables are not supported by any of Citus planners */
413 	if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
414 													  IsLocalTableRteOrMatView))
415 	{
416 		return false;
417 	}
418 
419 	/*
420 	 * Finally, check whether this subquery contains distribution key equality or not.
421 	 */
422 	if (!AllDistributionKeysInSubqueryAreEqual(subquery,
423 											   context->plannerRestrictionContext))
424 	{
425 		return true;
426 	}
427 
428 	return false;
429 }
430 
431 
432 /*
433  * ContainsSubquery returns true if the input query contains any subqueries
434  * in the FROM or WHERE clauses.
435  */
436 static bool
ContainsSubquery(Query * query)437 ContainsSubquery(Query *query)
438 {
439 	return JoinTreeContainsSubquery(query) || WhereOrHavingClauseContainsSubquery(query);
440 }
441 
442 
443 /*
444  * RecursivelyPlanNonColocatedSubqueries gets a query which includes one or more
445  * other subqueries that are not joined on their distribution keys. The function
446  * tries to recursively plan some of the subqueries to make the input query
447  * executable by Citus.
448  *
449  * The function picks an anchor subquery and iterates on the remaining subqueries.
450  * Whenever it finds a non colocated subquery with the anchor subquery, the function
451  * decides to recursively plan the non colocated subquery.
452  *
453  * The function first handles subqueries in FROM clause (i.e., jointree->fromlist) and then
454  * subqueries in WHERE clause (i.e., jointree->quals).
455  *
456  * The function does not treat outer joins seperately. Thus, we might end up with
457  * a query where the function decides to recursively plan an outer side of an outer
458  * join (i.e., LEFT side of LEFT JOIN). For simplicity, we chose to do so and handle
459  * outer joins with a seperate pass on the join tree.
460  */
461 static void
RecursivelyPlanNonColocatedSubqueries(Query * subquery,RecursivePlanningContext * context)462 RecursivelyPlanNonColocatedSubqueries(Query *subquery, RecursivePlanningContext *context)
463 {
464 	FromExpr *joinTree = subquery->jointree;
465 
466 	/* create the context for the non colocated subquery planning */
467 	PlannerRestrictionContext *restrictionContext = context->plannerRestrictionContext;
468 	ColocatedJoinChecker colocatedJoinChecker = CreateColocatedJoinChecker(subquery,
469 																		   restrictionContext);
470 
471 	/*
472 	 * Although this is a rare case, we weren't able to pick an anchor
473 	 * range table entry, so we cannot continue.
474 	 */
475 	if (colocatedJoinChecker.anchorRelationRestrictionList == NIL)
476 	{
477 		return;
478 	}
479 
480 	/* handle from clause subqueries first */
481 	RecursivelyPlanNonColocatedJoinWalker((Node *) joinTree, &colocatedJoinChecker,
482 										  context);
483 
484 	/* handle subqueries in WHERE clause */
485 	RecursivelyPlanNonColocatedSubqueriesInWhere(subquery, &colocatedJoinChecker,
486 												 context);
487 }
488 
489 
490 /*
491  * RecursivelyPlanNonColocatedJoinWalker gets a join node and walks over it to find
492  * subqueries that live under the node.
493  *
494  * When a subquery found, it's checked whether the subquery is colocated with the
495  * anchor subquery specified in the nonColocatedJoinContext. If not,
496  * the subquery is recursively planned.
497  */
498 static void
RecursivelyPlanNonColocatedJoinWalker(Node * joinNode,ColocatedJoinChecker * colocatedJoinChecker,RecursivePlanningContext * recursivePlanningContext)499 RecursivelyPlanNonColocatedJoinWalker(Node *joinNode,
500 									  ColocatedJoinChecker *colocatedJoinChecker,
501 									  RecursivePlanningContext *recursivePlanningContext)
502 {
503 	if (joinNode == NULL)
504 	{
505 		return;
506 	}
507 	else if (IsA(joinNode, FromExpr))
508 	{
509 		FromExpr *fromExpr = (FromExpr *) joinNode;
510 		ListCell *fromExprCell;
511 
512 		/*
513 		 * For each element of the from list, check whether the element is
514 		 * colocated with the anchor subquery by recursing until we
515 		 * find the subqueries.
516 		 */
517 		foreach(fromExprCell, fromExpr->fromlist)
518 		{
519 			Node *fromElement = (Node *) lfirst(fromExprCell);
520 
521 			RecursivelyPlanNonColocatedJoinWalker(fromElement, colocatedJoinChecker,
522 												  recursivePlanningContext);
523 		}
524 	}
525 	else if (IsA(joinNode, JoinExpr))
526 	{
527 		JoinExpr *joinExpr = (JoinExpr *) joinNode;
528 
529 		/* recurse into the left subtree */
530 		RecursivelyPlanNonColocatedJoinWalker(joinExpr->larg, colocatedJoinChecker,
531 											  recursivePlanningContext);
532 
533 		/* recurse into the right subtree */
534 		RecursivelyPlanNonColocatedJoinWalker(joinExpr->rarg, colocatedJoinChecker,
535 											  recursivePlanningContext);
536 	}
537 	else if (IsA(joinNode, RangeTblRef))
538 	{
539 		int rangeTableIndex = ((RangeTblRef *) joinNode)->rtindex;
540 		List *rangeTableList = colocatedJoinChecker->subquery->rtable;
541 		RangeTblEntry *rte = rt_fetch(rangeTableIndex, rangeTableList);
542 
543 		/* we're only interested in subqueries for now */
544 		if (rte->rtekind != RTE_SUBQUERY)
545 		{
546 			return;
547 		}
548 
549 		/*
550 		 * If the subquery is not colocated with the anchor subquery,
551 		 * recursively plan it.
552 		 */
553 		Query *subquery = rte->subquery;
554 		if (!SubqueryColocated(subquery, colocatedJoinChecker))
555 		{
556 			RecursivelyPlanSubquery(subquery, recursivePlanningContext);
557 		}
558 	}
559 	else
560 	{
561 		pg_unreachable();
562 	}
563 }
564 
565 
566 /*
567  * RecursivelyPlanNonColocatedJoinWalker gets a query and walks over its sublinks
568  * to find subqueries that live in WHERE clause.
569  *
570  * When a subquery found, it's checked whether the subquery is colocated with the
571  * anchor subquery specified in the nonColocatedJoinContext. If not,
572  * the subquery is recursively planned.
573  */
574 static void
RecursivelyPlanNonColocatedSubqueriesInWhere(Query * query,ColocatedJoinChecker * colocatedJoinChecker,RecursivePlanningContext * recursivePlanningContext)575 RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
576 											 ColocatedJoinChecker *colocatedJoinChecker,
577 											 RecursivePlanningContext *
578 											 recursivePlanningContext)
579 {
580 	List *sublinkList = SublinkListFromWhere(query);
581 	ListCell *sublinkCell = NULL;
582 
583 	foreach(sublinkCell, sublinkList)
584 	{
585 		SubLink *sublink = (SubLink *) lfirst(sublinkCell);
586 		Query *subselect = (Query *) sublink->subselect;
587 
588 		/* subselect is probably never NULL, but anyway lets keep the check */
589 		if (subselect == NULL)
590 		{
591 			continue;
592 		}
593 
594 		if (!SubqueryColocated(subselect, colocatedJoinChecker))
595 		{
596 			RecursivelyPlanSubquery(subselect, recursivePlanningContext);
597 		}
598 	}
599 }
600 
601 
602 /*
603  * SublinkListFromWhere finds the subquery nodes in the where clause of the given query. Note
604  * that the function should be called on the original query given that postgres
605  * standard_planner() may convert the subqueries in WHERE clause to joins.
606  */
607 static List *
SublinkListFromWhere(Query * originalQuery)608 SublinkListFromWhere(Query *originalQuery)
609 {
610 	FromExpr *joinTree = originalQuery->jointree;
611 	List *sublinkList = NIL;
612 
613 	if (!joinTree)
614 	{
615 		return NIL;
616 	}
617 
618 	Node *queryQuals = joinTree->quals;
619 	ExtractSublinkWalker(queryQuals, &sublinkList);
620 
621 	return sublinkList;
622 }
623 
624 
625 /*
626  * ExtractSublinkWalker walks over a quals node, and finds all sublinks
627  * in that node.
628  */
629 static bool
ExtractSublinkWalker(Node * node,List ** sublinkList)630 ExtractSublinkWalker(Node *node, List **sublinkList)
631 {
632 	bool walkerResult = false;
633 	if (node == NULL)
634 	{
635 		return false;
636 	}
637 
638 	if (IsA(node, SubLink))
639 	{
640 		(*sublinkList) = lappend(*sublinkList, node);
641 	}
642 	else
643 	{
644 		walkerResult = expression_tree_walker(node, ExtractSublinkWalker,
645 											  sublinkList);
646 	}
647 
648 	return walkerResult;
649 }
650 
651 
652 /*
653  * ShouldRecursivelyPlanSublinks returns true if the query has a recurring
654  * FROM clause.
655  */
656 static bool
ShouldRecursivelyPlanSublinks(Query * query)657 ShouldRecursivelyPlanSublinks(Query *query)
658 {
659 	if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable,
660 													  IsDistributedTableRTE))
661 	{
662 		/* there is a distributed table in the FROM clause */
663 		return false;
664 	}
665 
666 	return true;
667 }
668 
669 
670 /*
671  * RecursivelyPlanAllSubqueries descends into an expression tree and recursively
672  * plans all subqueries that contain at least one distributed table. The recursive
673  * planning starts from the top of the input query.
674  */
675 static bool
RecursivelyPlanAllSubqueries(Node * node,RecursivePlanningContext * planningContext)676 RecursivelyPlanAllSubqueries(Node *node, RecursivePlanningContext *planningContext)
677 {
678 	if (node == NULL)
679 	{
680 		return false;
681 	}
682 
683 	if (IsA(node, Query))
684 	{
685 		Query *query = (Query *) node;
686 		if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable, IsCitusTableRTE))
687 		{
688 			RecursivelyPlanSubquery(query, planningContext);
689 		}
690 
691 		return false;
692 	}
693 
694 	return expression_tree_walker(node, RecursivelyPlanAllSubqueries, planningContext);
695 }
696 
697 
698 /*
699  * RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner
700  * The resulting plan is added to planningContext->subPlanList and CTE references
701  * are replaced by subqueries that call read_intermediate_result, which reads the
702  * intermediate result of the CTE after it is executed.
703  *
704  * Recursive and modifying CTEs are not yet supported and return an error.
705  */
706 static DeferredErrorMessage *
RecursivelyPlanCTEs(Query * query,RecursivePlanningContext * planningContext)707 RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
708 {
709 	ListCell *cteCell = NULL;
710 	CteReferenceWalkerContext context = { -1, NIL };
711 
712 	if (query->cteList == NIL)
713 	{
714 		/* no CTEs, nothing to do */
715 		return NULL;
716 	}
717 
718 	if (query->hasRecursive)
719 	{
720 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
721 							 "recursive CTEs are not supported in distributed "
722 							 "queries",
723 							 NULL, NULL);
724 	}
725 
726 	/* get all RTE_CTEs that point to CTEs from cteList */
727 	CteReferenceListWalker((Node *) query, &context);
728 
729 	foreach(cteCell, query->cteList)
730 	{
731 		CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell);
732 		char *cteName = cte->ctename;
733 		Query *subquery = (Query *) cte->ctequery;
734 		uint64 planId = planningContext->planId;
735 		List *cteTargetList = NIL;
736 		ListCell *rteCell = NULL;
737 		int replacedCtesCount = 0;
738 
739 		if (ContainsReferencesToOuterQuery(subquery))
740 		{
741 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
742 								 "CTEs that refer to other subqueries are not "
743 								 "supported in multi-shard queries",
744 								 NULL, NULL);
745 		}
746 
747 		if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT)
748 		{
749 			/*
750 			 * SELECT CTEs that aren't referenced aren't executed in postgres.
751 			 * We don't need to generate a subplan for it and can take the rest
752 			 * of this iteration off.
753 			 */
754 			continue;
755 		}
756 
757 		uint32 subPlanId = list_length(planningContext->subPlanList) + 1;
758 
759 		if (IsLoggableLevel(DEBUG1))
760 		{
761 			StringInfo subPlanString = makeStringInfo();
762 			pg_get_query_def(subquery, subPlanString);
763 			ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
764 									"_%u for CTE %s: %s", planId, subPlanId,
765 									cteName,
766 									ApplyLogRedaction(subPlanString->data))));
767 		}
768 
769 		/* build a sub plan for the CTE */
770 		DistributedSubPlan *subPlan = CreateDistributedSubPlan(subPlanId, subquery);
771 		planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);
772 
773 		/* build the result_id parameter for the call to read_intermediate_result */
774 		char *resultId = GenerateResultId(planId, subPlanId);
775 
776 		if (subquery->returningList)
777 		{
778 			/* modifying CTE with returning */
779 			cteTargetList = subquery->returningList;
780 		}
781 		else
782 		{
783 			/* regular SELECT CTE */
784 			cteTargetList = subquery->targetList;
785 		}
786 
787 		/* replace references to the CTE with a subquery that reads results */
788 		Query *resultQuery = BuildSubPlanResultQuery(cteTargetList, cte->aliascolnames,
789 													 resultId);
790 
791 		foreach(rteCell, context.cteReferenceList)
792 		{
793 			RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rteCell);
794 
795 			if (rangeTableEntry->rtekind != RTE_CTE)
796 			{
797 				/*
798 				 * This RTE pointed to a preceding CTE that was already replaced
799 				 * by a subplan.
800 				 */
801 				continue;
802 			}
803 
804 			if (strncmp(rangeTableEntry->ctename, cteName, NAMEDATALEN) == 0)
805 			{
806 				/* change the RTE_CTE into an RTE_SUBQUERY */
807 				rangeTableEntry->rtekind = RTE_SUBQUERY;
808 				rangeTableEntry->ctename = NULL;
809 				rangeTableEntry->ctelevelsup = 0;
810 
811 				if (replacedCtesCount == 0)
812 				{
813 					/*
814 					 * Replace the first CTE reference with the result query directly.
815 					 */
816 					rangeTableEntry->subquery = resultQuery;
817 				}
818 				else
819 				{
820 					/*
821 					 * Replace subsequent CTE references with a copy of the result
822 					 * query.
823 					 */
824 					rangeTableEntry->subquery = copyObject(resultQuery);
825 				}
826 
827 				replacedCtesCount++;
828 			}
829 		}
830 
831 		Assert(cte->cterefcount == replacedCtesCount);
832 	}
833 
834 	/*
835 	 * All CTEs are now executed through subplans and RTE_CTEs pointing
836 	 * to the CTE list have been replaced with subqueries. We can now
837 	 * clear the cteList.
838 	 */
839 	query->cteList = NIL;
840 
841 	return NULL;
842 }
843 
844 
845 /*
846  * RecursivelyPlanSubqueryWalker recursively finds all the Query nodes and
847  * recursively plans if necessary.
848  */
849 static bool
RecursivelyPlanSubqueryWalker(Node * node,RecursivePlanningContext * context)850 RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context)
851 {
852 	if (node == NULL)
853 	{
854 		return false;
855 	}
856 
857 	if (IsA(node, Query))
858 	{
859 		Query *query = (Query *) node;
860 
861 		context->level += 1;
862 
863 		/*
864 		 * First, make sure any subqueries and CTEs within this subquery
865 		 * are recursively planned if necessary.
866 		 */
867 		DeferredErrorMessage *error = RecursivelyPlanSubqueriesAndCTEs(query, context);
868 		if (error != NULL)
869 		{
870 			RaiseDeferredError(error, ERROR);
871 		}
872 		context->level -= 1;
873 
874 		/*
875 		 * Recursively plan this subquery if it cannot be pushed down and is
876 		 * eligible for recursive planning.
877 		 */
878 		if (ShouldRecursivelyPlanSubquery(query, context))
879 		{
880 			RecursivelyPlanSubquery(query, context);
881 		}
882 
883 		/* we're done, no need to recurse anymore for this query */
884 		return false;
885 	}
886 
887 	return expression_tree_walker(node, RecursivelyPlanSubqueryWalker, context);
888 }
889 
890 
891 /*
892  * ShouldRecursivelyPlanSubquery decides whether the input subquery should be recursively
893  * planned or not.
894  *
895  * For the details, see the cases in the function.
896  */
897 static bool
ShouldRecursivelyPlanSubquery(Query * subquery,RecursivePlanningContext * context)898 ShouldRecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *context)
899 {
900 	if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
901 													  IsLocalTableRteOrMatView))
902 	{
903 		/*
904 		 * Postgres can always plan queries that don't require distributed planning.
905 		 * Note that we need to check this first, otherwise the calls to the many other
906 		 * Citus planner functions would error our due to local relations.
907 		 *
908 		 * TODO: We could only successfully create distributed plans with local tables
909 		 * when the local tables are on the leaf queries and the upper level queries
910 		 * do not contain any other local tables.
911 		 */
912 	}
913 	else if (CanPushdownSubquery(subquery, false))
914 	{
915 		/*
916 		 * We should do one more check for the distribution key equality.
917 		 *
918 		 * If the input query to the planner doesn't contain distribution key equality,
919 		 * we should further check whether this individual subquery contains or not.
920 		 *
921 		 * If all relations are not joined on their distribution keys for the given
922 		 * subquery, we cannot push push it down and therefore we should try to
923 		 * recursively plan it.
924 		 */
925 		if (!context->allDistributionKeysInQueryAreEqual &&
926 			!AllDistributionKeysInSubqueryAreEqual(subquery,
927 												   context->plannerRestrictionContext))
928 		{
929 			return true;
930 		}
931 
932 		/*
933 		 * Citus can pushdown this subquery, no need to recursively
934 		 * plan which is much more expensive than pushdown.
935 		 */
936 		return false;
937 	}
938 
939 	return true;
940 }
941 
942 
943 /*
944  * AllDistributionKeysInSubqueryAreEqual is a wrapper function
945  * for AllDistributionKeysInQueryAreEqual(). Here, we filter the
946  * planner restrictions for the given subquery and do the restriction
947  * equality checks on the filtered restriction.
948  */
949 static bool
AllDistributionKeysInSubqueryAreEqual(Query * subquery,PlannerRestrictionContext * restrictionContext)950 AllDistributionKeysInSubqueryAreEqual(Query *subquery,
951 									  PlannerRestrictionContext *restrictionContext)
952 {
953 	/* we don't support distribution eq. checks for CTEs yet */
954 	if (subquery->cteList != NIL)
955 	{
956 		return false;
957 	}
958 
959 	PlannerRestrictionContext *filteredRestrictionContext =
960 		FilterPlannerRestrictionForQuery(restrictionContext, subquery);
961 
962 	bool allDistributionKeysInSubqueryAreEqual =
963 		AllDistributionKeysInQueryAreEqual(subquery, filteredRestrictionContext);
964 	if (!allDistributionKeysInSubqueryAreEqual)
965 	{
966 		return false;
967 	}
968 
969 	return true;
970 }
971 
972 
973 /*
974  * ShouldRecursivelyPlanSetOperation determines whether the leaf queries of a
975  * set operations tree need to be recursively planned in order to support the
976  * query as a whole.
977  */
978 static bool
ShouldRecursivelyPlanSetOperation(Query * query,RecursivePlanningContext * context)979 ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context)
980 {
981 	SetOperationStmt *setOperations = (SetOperationStmt *) query->setOperations;
982 	if (setOperations == NULL)
983 	{
984 		return false;
985 	}
986 
987 	if (context->level == 0)
988 	{
989 		/*
990 		 * We cannot push down top-level set operation. Recursively plan the
991 		 * leaf nodes such that it becomes a router query.
992 		 */
993 		return true;
994 	}
995 
996 	if (setOperations->op != SETOP_UNION)
997 	{
998 		/*
999 		 * We can only push down UNION operaionts, plan other set operations
1000 		 * recursively.
1001 		 */
1002 		return true;
1003 	}
1004 
1005 	if (DeferErrorIfUnsupportedUnionQuery(query) != NULL)
1006 	{
1007 		/*
1008 		 * If at least one leaf query in the union is recurring, then all
1009 		 * leaf nodes need to be recurring.
1010 		 */
1011 		return true;
1012 	}
1013 
1014 	PlannerRestrictionContext *filteredRestrictionContext =
1015 		FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
1016 	if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext))
1017 	{
1018 		/*
1019 		 * The distribution column is not in the same place in all sides
1020 		 * of the union, meaning we cannot determine distribution column
1021 		 * equivalence. Recursive planning is necessary.
1022 		 */
1023 		return true;
1024 	}
1025 
1026 	return false;
1027 }
1028 
1029 
1030 /*
1031  * RecursivelyPlanSetOperations descends into a tree of set operations
1032  * (e.g. UNION, INTERSECTS) and recursively plans all leaf nodes that
1033  * contain distributed tables.
1034  */
1035 static void
RecursivelyPlanSetOperations(Query * query,Node * node,RecursivePlanningContext * context)1036 RecursivelyPlanSetOperations(Query *query, Node *node,
1037 							 RecursivePlanningContext *context)
1038 {
1039 	if (IsA(node, SetOperationStmt))
1040 	{
1041 		SetOperationStmt *setOperations = (SetOperationStmt *) node;
1042 
1043 		RecursivelyPlanSetOperations(query, setOperations->larg, context);
1044 		RecursivelyPlanSetOperations(query, setOperations->rarg, context);
1045 	}
1046 	else if (IsA(node, RangeTblRef))
1047 	{
1048 		RangeTblRef *rangeTableRef = (RangeTblRef *) node;
1049 		RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableRef->rtindex,
1050 												  query->rtable);
1051 		Query *subquery = rangeTableEntry->subquery;
1052 
1053 		if (rangeTableEntry->rtekind == RTE_SUBQUERY &&
1054 			FindNodeMatchingCheckFunction((Node *) subquery, IsDistributedTableRTE))
1055 		{
1056 			RecursivelyPlanSubquery(subquery, context);
1057 		}
1058 	}
1059 	else
1060 	{
1061 		ereport(ERROR, (errmsg("unexpected node type (%d) while "
1062 							   "expecting set operations or "
1063 							   "range table references", nodeTag(node))));
1064 	}
1065 }
1066 
1067 
1068 /*
1069  * IsLocalTableRteOrMatView gets a node and returns true if the node is a range
1070  * table entry that points to a postgres local or citus local table or to a
1071  * materialized view.
1072  */
1073 static bool
IsLocalTableRteOrMatView(Node * node)1074 IsLocalTableRteOrMatView(Node *node)
1075 {
1076 	if (node == NULL)
1077 	{
1078 		return false;
1079 	}
1080 
1081 	if (!IsA(node, RangeTblEntry))
1082 	{
1083 		return false;
1084 	}
1085 
1086 	RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
1087 	if (rangeTableEntry->rtekind != RTE_RELATION)
1088 	{
1089 		return false;
1090 	}
1091 
1092 	if (rangeTableEntry->relkind == RELKIND_VIEW)
1093 	{
1094 		return false;
1095 	}
1096 
1097 	Oid relationId = rangeTableEntry->relid;
1098 	return IsRelationLocalTableOrMatView(relationId);
1099 }
1100 
1101 
1102 /*
1103  * IsRelationLocalTableOrMatView returns true if the given relation
1104  * is a citus local, local, or materialized view.
1105  */
1106 bool
IsRelationLocalTableOrMatView(Oid relationId)1107 IsRelationLocalTableOrMatView(Oid relationId)
1108 {
1109 	if (!IsCitusTable(relationId))
1110 	{
1111 		/* postgres local table or a materialized view */
1112 		return true;
1113 	}
1114 	else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
1115 	{
1116 		return true;
1117 	}
1118 
1119 	/* no local table found */
1120 	return false;
1121 }
1122 
1123 
1124 /*
1125  * RecursivelyPlanQuery recursively plans a query, replaces it with a
1126  * result query and returns the subplan.
1127  *
1128  * Before we recursively plan the given subquery, we should ensure
1129  * that the subquery doesn't contain any references to the outer
1130  * queries (i.e., such queries cannot be separately planned). In
1131  * that case, the function doesn't recursively plan the input query
1132  * and immediately returns. Later, the planner decides on what to do
1133  * with the query.
1134  */
1135 static bool
RecursivelyPlanSubquery(Query * subquery,RecursivePlanningContext * planningContext)1136 RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext)
1137 {
1138 	uint64 planId = planningContext->planId;
1139 	Query *debugQuery = NULL;
1140 
1141 	if (ContainsReferencesToOuterQuery(subquery))
1142 	{
1143 		elog(DEBUG2, "skipping recursive planning for the subquery since it "
1144 					 "contains references to outer queries");
1145 
1146 		return false;
1147 	}
1148 
1149 	/*
1150 	 * Subquery will go through the standard planner, thus to properly deparse it
1151 	 * we keep its copy: debugQuery.
1152 	 */
1153 	if (IsLoggableLevel(DEBUG1))
1154 	{
1155 		debugQuery = copyObject(subquery);
1156 	}
1157 
1158 
1159 	/*
1160 	 * Create the subplan and append it to the list in the planning context.
1161 	 */
1162 	int subPlanId = list_length(planningContext->subPlanList) + 1;
1163 
1164 	DistributedSubPlan *subPlan = CreateDistributedSubPlan(subPlanId, subquery);
1165 	planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);
1166 
1167 	/* build the result_id parameter for the call to read_intermediate_result */
1168 	char *resultId = GenerateResultId(planId, subPlanId);
1169 
1170 	/*
1171 	 * BuildSubPlanResultQuery() can optionally use provided column aliases.
1172 	 * We do not need to send additional alias list for subqueries.
1173 	 */
1174 	Query *resultQuery = BuildSubPlanResultQuery(subquery->targetList, NIL, resultId);
1175 
1176 	if (IsLoggableLevel(DEBUG1))
1177 	{
1178 		StringInfo subqueryString = makeStringInfo();
1179 
1180 		pg_get_query_def(debugQuery, subqueryString);
1181 
1182 		ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
1183 								"_%u for subquery %s", planId, subPlanId,
1184 								ApplyLogRedaction(subqueryString->data))));
1185 	}
1186 
1187 	/* finally update the input subquery to point the result query */
1188 	*subquery = *resultQuery;
1189 	return true;
1190 }
1191 
1192 
1193 /*
1194  * CreateDistributedSubPlan creates a distributed subplan by recursively calling
1195  * the planner from the top, which may either generate a local plan or another
1196  * distributed plan, which can itself contain subplans.
1197  */
1198 static DistributedSubPlan *
CreateDistributedSubPlan(uint32 subPlanId,Query * subPlanQuery)1199 CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery)
1200 {
1201 	int cursorOptions = 0;
1202 
1203 	if (ContainsReadIntermediateResultFunction((Node *) subPlanQuery))
1204 	{
1205 		/*
1206 		 * Make sure we go through distributed planning if there are
1207 		 * read_intermediate_result calls, even if there are no distributed
1208 		 * tables in the query anymore.
1209 		 *
1210 		 * We cannot perform this check in the planner itself, since that
1211 		 * would also cause the workers to attempt distributed planning.
1212 		 */
1213 		cursorOptions |= CURSOR_OPT_FORCE_DISTRIBUTED;
1214 	}
1215 
1216 	DistributedSubPlan *subPlan = CitusMakeNode(DistributedSubPlan);
1217 	subPlan->plan = planner_compat(subPlanQuery, cursorOptions, NULL);
1218 	subPlan->subPlanId = subPlanId;
1219 
1220 	return subPlan;
1221 }
1222 
1223 
1224 /*
1225  * CteReferenceListWalker finds all references to CTEs in the top level of a query
1226  * and adds them to context->cteReferenceList.
1227  */
1228 static bool
CteReferenceListWalker(Node * node,CteReferenceWalkerContext * context)1229 CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context)
1230 {
1231 	if (node == NULL)
1232 	{
1233 		return false;
1234 	}
1235 
1236 	if (IsA(node, RangeTblEntry))
1237 	{
1238 		RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
1239 
1240 		if (rangeTableEntry->rtekind == RTE_CTE &&
1241 			rangeTableEntry->ctelevelsup == context->level)
1242 		{
1243 			context->cteReferenceList = lappend(context->cteReferenceList,
1244 												rangeTableEntry);
1245 		}
1246 
1247 		/* caller will descend into range table entry */
1248 		return false;
1249 	}
1250 	else if (IsA(node, Query))
1251 	{
1252 		Query *query = (Query *) node;
1253 
1254 		context->level += 1;
1255 		query_tree_walker(query, CteReferenceListWalker, context,
1256 						  QTW_EXAMINE_RTES_BEFORE);
1257 		context->level -= 1;
1258 
1259 		return false;
1260 	}
1261 	else
1262 	{
1263 		return expression_tree_walker(node, CteReferenceListWalker, context);
1264 	}
1265 }
1266 
1267 
1268 /*
1269  * ContainsReferencesToOuterQuery determines whether the given query contains
1270  * anything that points outside of the query itself. Such queries cannot be
1271  * planned recursively.
1272  */
1273 bool
ContainsReferencesToOuterQuery(Query * query)1274 ContainsReferencesToOuterQuery(Query *query)
1275 {
1276 	VarLevelsUpWalkerContext context = { 0 };
1277 	int flags = 0;
1278 
1279 	return query_tree_walker(query, ContainsReferencesToOuterQueryWalker,
1280 							 &context, flags);
1281 }
1282 
1283 
1284 /*
1285  * ContainsReferencesToOuterQueryWalker determines whether the given query
1286  * contains any Vars that point more than context->level levels up.
1287  *
1288  * ContainsReferencesToOuterQueryWalker recursively descends into subqueries
1289  * and increases the level by 1 before recursing.
1290  */
1291 static bool
ContainsReferencesToOuterQueryWalker(Node * node,VarLevelsUpWalkerContext * context)1292 ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context)
1293 {
1294 	if (node == NULL)
1295 	{
1296 		return false;
1297 	}
1298 
1299 	if (IsA(node, Var))
1300 	{
1301 		if (((Var *) node)->varlevelsup > context->level)
1302 		{
1303 			return true;
1304 		}
1305 
1306 		return false;
1307 	}
1308 	else if (IsA(node, Aggref))
1309 	{
1310 		if (((Aggref *) node)->agglevelsup > context->level)
1311 		{
1312 			return true;
1313 		}
1314 	}
1315 	else if (IsA(node, GroupingFunc))
1316 	{
1317 		if (((GroupingFunc *) node)->agglevelsup > context->level)
1318 		{
1319 			return true;
1320 		}
1321 
1322 		return false;
1323 	}
1324 	else if (IsA(node, PlaceHolderVar))
1325 	{
1326 		if (((PlaceHolderVar *) node)->phlevelsup > context->level)
1327 		{
1328 			return true;
1329 		}
1330 	}
1331 	else if (IsA(node, Query))
1332 	{
1333 		Query *query = (Query *) node;
1334 		int flags = 0;
1335 
1336 		context->level += 1;
1337 		bool found = query_tree_walker(query, ContainsReferencesToOuterQueryWalker,
1338 									   context, flags);
1339 		context->level -= 1;
1340 
1341 		return found;
1342 	}
1343 
1344 	return expression_tree_walker(node, ContainsReferencesToOuterQueryWalker,
1345 								  context);
1346 }
1347 
1348 
1349 /*
1350  * NodeContainsSubqueryReferencingOuterQuery determines whether the given node
1351  * contains anything that points outside of the query itself.
1352  */
1353 static bool
NodeContainsSubqueryReferencingOuterQuery(Node * node)1354 NodeContainsSubqueryReferencingOuterQuery(Node *node)
1355 {
1356 	List *sublinks = NIL;
1357 	ExtractSublinkWalker(node, &sublinks);
1358 
1359 	SubLink *sublink;
1360 	foreach_ptr(sublink, sublinks)
1361 	{
1362 		if (ContainsReferencesToOuterQuery(castNode(Query, sublink->subselect)))
1363 		{
1364 			return true;
1365 		}
1366 	}
1367 
1368 	return false;
1369 }
1370 
1371 
1372 /*
1373  * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
1374  * with a subquery. The function also pushes down the filters to the subquery.
1375  *
1376  * It then recursively plans the subquery. This subquery is wrapped with another subquery
1377  * as a trick to reduce network cost, because we currently don't have an easy way to
1378  * skip generating NULL's for non-required columns, and if we create (SELECT a, NULL, NULL FROM table)
1379  * then this will be sent over network and NULL's also occupy some space. Instead of this we generate:
1380  * (SELECT t.a, NULL, NULL FROM (SELECT a FROM table) t). The inner subquery will be recursively planned
1381  * but the outer part will not be yet it will still have the NULL columns so that the query is correct.
1382  */
1383 void
ReplaceRTERelationWithRteSubquery(RangeTblEntry * rangeTableEntry,List * requiredAttrNumbers,RecursivePlanningContext * context)1384 ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
1385 								  List *requiredAttrNumbers,
1386 								  RecursivePlanningContext *context)
1387 {
1388 	Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers);
1389 	List *outerQueryTargetList = CreateAllTargetListForRelation(rangeTableEntry->relid,
1390 																requiredAttrNumbers);
1391 
1392 	List *restrictionList =
1393 		GetRestrictInfoListForRelation(rangeTableEntry,
1394 									   context->plannerRestrictionContext);
1395 	List *copyRestrictionList = copyObject(restrictionList);
1396 	Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList);
1397 	subquery->jointree->quals = (Node *) andedBoundExpressions;
1398 
1399 	/*
1400 	 * Originally the quals were pointing to the RTE and its varno
1401 	 * was pointing to its index in rtable. However now we converted the RTE
1402 	 * to a subquery and the quals should be pointing to that subquery, which
1403 	 * is the only RTE in its rtable, hence we update the varnos so that they
1404 	 * point to the subquery RTE.
1405 	 * Originally: rtable: [rte1, current_rte, rte3...]
1406 	 * Now: rtable: [rte1, subquery[current_rte], rte3...] --subquery[current_rte] refers to its rtable.
1407 	 */
1408 	Node *quals = subquery->jointree->quals;
1409 	UpdateVarNosInNode(quals, SINGLE_RTE_INDEX);
1410 
1411 	/* replace the function with the constructed subquery */
1412 	rangeTableEntry->rtekind = RTE_SUBQUERY;
1413 	rangeTableEntry->subquery = subquery;
1414 
1415 	/*
1416 	 * If the relation is inherited, it'll still be inherited as
1417 	 * we've copied it earlier. This is to prevent the newly created
1418 	 * subquery being treated as inherited.
1419 	 */
1420 	rangeTableEntry->inh = false;
1421 
1422 	if (IsLoggableLevel(DEBUG1))
1423 	{
1424 		char *relationAndAliasName = GetRelationNameAndAliasName(rangeTableEntry);
1425 		ereport(DEBUG1, (errmsg("Wrapping relation %s to a subquery",
1426 								relationAndAliasName)));
1427 	}
1428 
1429 	/* as we created the subquery, now forcefully recursively plan it */
1430 	bool recursivelyPlanned = RecursivelyPlanSubquery(subquery, context);
1431 	if (!recursivelyPlanned)
1432 	{
1433 		ereport(ERROR, (errmsg(
1434 							"unexpected state: query should have been recursively planned")));
1435 	}
1436 
1437 	Query *outerSubquery = CreateOuterSubquery(rangeTableEntry, outerQueryTargetList);
1438 	rangeTableEntry->subquery = outerSubquery;
1439 }
1440 
1441 
1442 /*
1443  * GetRelationNameAndAliasName returns the relname + alias name if
1444  * alias name exists otherwise only the relname is returned.
1445  */
1446 static char *
GetRelationNameAndAliasName(RangeTblEntry * rangeTableEntry)1447 GetRelationNameAndAliasName(RangeTblEntry *rangeTableEntry)
1448 {
1449 	StringInfo str = makeStringInfo();
1450 	appendStringInfo(str, "\"%s\"", get_rel_name(rangeTableEntry->relid));
1451 
1452 	char *aliasName = NULL;
1453 	if (rangeTableEntry->alias)
1454 	{
1455 		aliasName = rangeTableEntry->alias->aliasname;
1456 	}
1457 
1458 	if (aliasName)
1459 	{
1460 		appendStringInfo(str, " \"%s\"", aliasName);
1461 	}
1462 	return str->data;
1463 }
1464 
1465 
1466 /*
1467  * CreateOuterSubquery creates outer subquery which contains
1468  * the given range table entry in its rtable.
1469  */
1470 static Query *
CreateOuterSubquery(RangeTblEntry * rangeTableEntry,List * outerSubqueryTargetList)1471 CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList)
1472 {
1473 	List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList(
1474 		outerSubqueryTargetList);
1475 
1476 	Query *outerSubquery = makeNode(Query);
1477 	outerSubquery->commandType = CMD_SELECT;
1478 
1479 	/* we copy the input rteRelation to preserve the rteIdentity */
1480 	RangeTblEntry *innerSubqueryRTE = copyObject(rangeTableEntry);
1481 
1482 	innerSubqueryRTE->eref->colnames = innerSubqueryColNames;
1483 	outerSubquery->rtable = list_make1(innerSubqueryRTE);
1484 
1485 	/* set the FROM expression to the subquery */
1486 	RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
1487 	newRangeTableRef->rtindex = 1;
1488 	outerSubquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
1489 
1490 	outerSubquery->targetList = outerSubqueryTargetList;
1491 	return outerSubquery;
1492 }
1493 
1494 
1495 /*
1496  * GenerateRequiredColNamesFromTargetList generates the required colnames
1497  * from the given target list.
1498  */
1499 static List *
GenerateRequiredColNamesFromTargetList(List * targetList)1500 GenerateRequiredColNamesFromTargetList(List *targetList)
1501 {
1502 	TargetEntry *entry = NULL;
1503 	List *innerSubqueryColNames = NIL;
1504 	foreach_ptr(entry, targetList)
1505 	{
1506 		if (IsA(entry->expr, Var))
1507 		{
1508 			/*
1509 			 * column names of the inner subquery should only contain the
1510 			 * required columns, as in if we choose 'b' from ('a','b') colnames
1511 			 * should be 'a' not ('a','b')
1512 			 */
1513 			innerSubqueryColNames = lappend(innerSubqueryColNames, makeString(
1514 												entry->resname));
1515 		}
1516 	}
1517 	return innerSubqueryColNames;
1518 }
1519 
1520 
1521 /*
1522  * UpdateVarNosInNode iterates the Vars in the
1523  * given node and updates the varno's as the newVarNo.
1524  */
1525 static void
UpdateVarNosInNode(Node * node,Index newVarNo)1526 UpdateVarNosInNode(Node *node, Index newVarNo)
1527 {
1528 	List *varList = pull_var_clause(node, PVC_RECURSE_AGGREGATES |
1529 									PVC_RECURSE_PLACEHOLDERS);
1530 	Var *var = NULL;
1531 	foreach_ptr(var, varList)
1532 	{
1533 		var->varno = newVarNo;
1534 	}
1535 }
1536 
1537 
1538 /*
1539  * IsRecursivelyPlannableRelation returns true if the given range table entry
1540  * is a relation type that can be converted to a subquery.
1541  */
1542 bool
IsRecursivelyPlannableRelation(RangeTblEntry * rangeTableEntry)1543 IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry)
1544 {
1545 	if (rangeTableEntry->rtekind != RTE_RELATION)
1546 	{
1547 		return false;
1548 	}
1549 	return rangeTableEntry->relkind == RELKIND_PARTITIONED_TABLE ||
1550 		   rangeTableEntry->relkind == RELKIND_RELATION ||
1551 		   rangeTableEntry->relkind == RELKIND_MATVIEW ||
1552 		   rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE;
1553 }
1554 
1555 
1556 /*
1557  * ContainsLocalTableDistributedTableJoin returns true if the input range table list
1558  * contains a direct join between local RTE and an RTE that contains a distributed
1559  * or reference table.
1560  */
1561 bool
ContainsLocalTableDistributedTableJoin(List * rangeTableList)1562 ContainsLocalTableDistributedTableJoin(List *rangeTableList)
1563 {
1564 	bool containsLocalTable = false;
1565 	bool containsDistributedTable = false;
1566 
1567 	RangeTblEntry *rangeTableEntry = NULL;
1568 	foreach_ptr(rangeTableEntry, rangeTableList)
1569 	{
1570 		if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry),
1571 														  IsDistributedOrReferenceTableRTE))
1572 		{
1573 			containsDistributedTable = true;
1574 		}
1575 		else if (IsRecursivelyPlannableRelation(rangeTableEntry) &&
1576 				 IsLocalTableRteOrMatView((Node *) rangeTableEntry))
1577 		{
1578 			/* we consider citus local tables as local table */
1579 			containsLocalTable = true;
1580 		}
1581 	}
1582 
1583 	return containsLocalTable && containsDistributedTable;
1584 }
1585 
1586 
1587 /*
1588  * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries
1589  * of a query and wraps the functions inside (SELECT * FROM fnc() f)
1590  * subqueries, so that those functions will be executed on the coordinator if
1591  * necessary.
1592  *
1593  * We wrap all the functions that are used in joins except the ones that are
1594  * laterally joined or have WITH ORDINALITY clauses.
1595  * */
1596 static void
WrapFunctionsInSubqueries(Query * query)1597 WrapFunctionsInSubqueries(Query *query)
1598 {
1599 	List *rangeTableList = query->rtable;
1600 	ListCell *rangeTableCell = NULL;
1601 
1602 	/*
1603 	 * If we have only one function call in a query without any joins, we can
1604 	 * easily decide where to execute it.
1605 	 *
1606 	 * If there are some subqueries and/or functions that are joined with a
1607 	 * function, it is not trivial to decide whether we should run this
1608 	 * function in the coordinator or in workers and therefore we may need to
1609 	 * wrap some of those functions in subqueries.
1610 	 *
1611 	 * If we have only one RTE, we leave the parsed query tree as it is. This
1612 	 * also makes sure we do not wrap an already wrapped function call
1613 	 * because we know that there will always be 1 RTE in a wrapped function.
1614 	 * */
1615 	if (list_length(rangeTableList) < 2)
1616 	{
1617 		return;
1618 	}
1619 
1620 	/* iterate over all RTEs and wrap them if necessary */
1621 	foreach(rangeTableCell, rangeTableList)
1622 	{
1623 		RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
1624 
1625 		if (ShouldTransformRTE(rangeTableEntry))
1626 		{
1627 			TransformFunctionRTE(rangeTableEntry);
1628 		}
1629 	}
1630 }
1631 
1632 
1633 /*
1634  * TransformFunctionRTE wraps a given function RangeTableEntry
1635  * inside a (SELECT * from function() f) subquery.
1636  *
1637  * The said RangeTableEntry is modified and now points to the new subquery.
1638  * */
1639 static void
TransformFunctionRTE(RangeTblEntry * rangeTblEntry)1640 TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
1641 {
1642 	Query *subquery = makeNode(Query);
1643 	RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
1644 	Var *targetColumn = NULL;
1645 	TargetEntry *targetEntry = NULL;
1646 	AttrNumber targetColumnIndex = 0;
1647 
1648 	RangeTblFunction *rangeTblFunction = linitial(rangeTblEntry->functions);
1649 
1650 	subquery->commandType = CMD_SELECT;
1651 
1652 	/* copy the input rangeTblEntry to prevent cycles */
1653 	RangeTblEntry *newRangeTableEntry = copyObject(rangeTblEntry);
1654 
1655 	/* set the FROM expression to the subquery */
1656 	subquery->rtable = list_make1(newRangeTableEntry);
1657 	newRangeTableRef->rtindex = 1;
1658 	subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
1659 
1660 	/* Determine the result type of the function.
1661 	 *
1662 	 * If function return type is not composite or rowtype can't be determined,
1663 	 * tupleDesc is set to null here
1664 	 */
1665 	TupleDesc tupleDesc = (TupleDesc) get_expr_result_tupdesc(rangeTblFunction->funcexpr,
1666 															  true);
1667 
1668 	/*
1669 	 * If tupleDesc is not null, we iterate over all the attributes and
1670 	 * create targetEntries
1671 	 * */
1672 	if (tupleDesc)
1673 	{
1674 		/*
1675 		 * A sample function join that end up here:
1676 		 *
1677 		 * CREATE FUNCTION f(..) RETURNS TABLE(c1 int, c2 text) AS .. ;
1678 		 * SELECT .. FROM table JOIN f(..) ON ( .. ) ;
1679 		 *
1680 		 * We will iterate over Tuple Description attributes. i.e (c1 int, c2 text)
1681 		 */
1682 		if (tupleDesc->natts > MaxAttrNumber)
1683 		{
1684 			ereport(ERROR, (errmsg("bad number of tuple descriptor attributes")));
1685 		}
1686 		AttrNumber natts = tupleDesc->natts;
1687 		for (targetColumnIndex = 0; targetColumnIndex < natts;
1688 			 targetColumnIndex++)
1689 		{
1690 			FormData_pg_attribute *attribute = TupleDescAttr(tupleDesc,
1691 															 targetColumnIndex);
1692 			Oid columnType = attribute->atttypid;
1693 			char *columnName = attribute->attname.data;
1694 
1695 			/*
1696 			 * The indexing of attributes and TupleDesc and varattno differ
1697 			 *
1698 			 * varattno=0 corresponds to whole row
1699 			 * varattno=1 corresponds to first column that is stored in tupDesc->attrs[0]
1700 			 *
1701 			 * That's why we need to add one to the targetColumnIndex
1702 			 * */
1703 			targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid,
1704 								   0);
1705 			targetEntry = makeTargetEntry((Expr *) targetColumn, targetColumnIndex + 1,
1706 										  columnName, false);
1707 			subquery->targetList = lappend(subquery->targetList, targetEntry);
1708 		}
1709 	}
1710 
1711 	/*
1712 	 * If tupleDesc is NULL we have 2 different cases:
1713 	 *
1714 	 * 1. The function returns a record but the attributes can not be
1715 	 * determined just by looking at the function definition. In this case the
1716 	 * column names and types must be defined explicitly in the query
1717 	 *
1718 	 * 2. The function returns a non-composite type (e.g. int, text, jsonb ..)
1719 	 * */
1720 	else
1721 	{
1722 		/* create target entries for all columns returned by the function */
1723 		ListCell *functionColumnName = NULL;
1724 
1725 		List *functionColumnNames = rangeTblEntry->eref->colnames;
1726 		foreach(functionColumnName, functionColumnNames)
1727 		{
1728 			char *columnName = strVal(lfirst(functionColumnName));
1729 			Oid columnType = InvalidOid;
1730 
1731 			/*
1732 			 * If the function returns a set of records, the query needs
1733 			 * to explicitly name column names and types
1734 			 *
1735 			 * Use explicitly defined types in the query if they are
1736 			 * available
1737 			 * */
1738 			if (list_length(rangeTblFunction->funccoltypes) > 0)
1739 			{
1740 				/*
1741 				 * A sample function join that end up here:
1742 				 *
1743 				 * CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS
1744 				 * $cmd$
1745 				 * SELECT x, x+1 FROM generate_series(0,4) f(x)
1746 				 * $cmd$
1747 				 * LANGUAGE SQL;
1748 				 *
1749 				 * SELECT *
1750 				 * FROM table1 JOIN get_set_of_records() AS t2(x int, y int)
1751 				 * ON (id = x);
1752 				 *
1753 				 * Note that the function definition does not have column
1754 				 * names and types. Therefore the user needs to explicitly
1755 				 * state them in the query
1756 				 * */
1757 				columnType = list_nth_oid(rangeTblFunction->funccoltypes,
1758 										  targetColumnIndex);
1759 			}
1760 
1761 			/* use the types in the function definition otherwise */
1762 			else
1763 			{
1764 				/*
1765 				 * Only functions returning simple types end up here.
1766 				 * A sample function:
1767 				 *
1768 				 * CREATE FUNCTION add(integer, integer) RETURNS integer AS
1769 				 * 'SELECT $1 + $2;'
1770 				 * LANGUAGE SQL;
1771 				 * SELECT * FROM table JOIN add(3,5) sum ON ( .. ) ;
1772 				 * */
1773 				FuncExpr *funcExpr = (FuncExpr *) rangeTblFunction->funcexpr;
1774 				columnType = funcExpr->funcresulttype;
1775 			}
1776 
1777 			/* Note that the column k is associated with varattno/resno of k+1 */
1778 			targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1,
1779 								   InvalidOid, 0);
1780 			targetEntry = makeTargetEntry((Expr *) targetColumn,
1781 										  targetColumnIndex + 1, columnName, false);
1782 			subquery->targetList = lappend(subquery->targetList, targetEntry);
1783 
1784 			targetColumnIndex++;
1785 		}
1786 	}
1787 
1788 	/* replace the function with the constructed subquery */
1789 	rangeTblEntry->rtekind = RTE_SUBQUERY;
1790 	rangeTblEntry->subquery = subquery;
1791 }
1792 
1793 
1794 /*
1795  * ShouldTransformRTE determines whether a given RTE should bne wrapped in a
1796  * subquery.
1797  *
1798  * Not all functions should be wrapped in a subquery for now. As we support more
1799  * functions to be used in joins, the constraints here will be relaxed.
1800  * */
1801 static bool
ShouldTransformRTE(RangeTblEntry * rangeTableEntry)1802 ShouldTransformRTE(RangeTblEntry *rangeTableEntry)
1803 {
1804 	/*
1805 	 * We should wrap only function rtes that are not LATERAL and
1806 	 * without WITH ORDINALITY clause
1807 	 */
1808 	if (rangeTableEntry->rtekind != RTE_FUNCTION ||
1809 		rangeTableEntry->lateral ||
1810 		rangeTableEntry->funcordinality)
1811 	{
1812 		return false;
1813 	}
1814 	return true;
1815 }
1816 
1817 
1818 /*
1819  * BuildSubPlanResultQuery returns a query of the form:
1820  *
1821  * SELECT
1822  *   <target list>
1823  * FROM
1824  *   read_intermediate_result('<resultId>', '<copy format'>)
1825  *   AS res (<column definition list>);
1826  *
1827  * The caller can optionally supply a columnAliasList, which is useful for
1828  * CTEs that have column aliases.
1829  *
1830  * If any of the types in the target list cannot be used in the binary copy format,
1831  * then the copy format 'text' is used, otherwise 'binary' is used.
1832  */
1833 Query *
BuildSubPlanResultQuery(List * targetEntryList,List * columnAliasList,char * resultId)1834 BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId)
1835 {
1836 	Oid functionOid = CitusReadIntermediateResultFuncId();
1837 	bool useBinaryCopyFormat = CanUseBinaryCopyFormatForTargetList(targetEntryList);
1838 
1839 	Const *resultIdConst = makeNode(Const);
1840 	resultIdConst->consttype = TEXTOID;
1841 	resultIdConst->consttypmod = -1;
1842 	resultIdConst->constlen = -1;
1843 	resultIdConst->constvalue = CStringGetTextDatum(resultId);
1844 	resultIdConst->constbyval = false;
1845 	resultIdConst->constisnull = false;
1846 	resultIdConst->location = -1;
1847 
1848 	return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList,
1849 											 resultIdConst, functionOid,
1850 											 useBinaryCopyFormat);
1851 }
1852 
1853 
1854 /*
1855  * BuildReadIntermediateResultsArrayQuery returns a query of the form:
1856  *
1857  * SELECT
1858  *   <target list>
1859  * FROM
1860  *   read_intermediate_results(ARRAY['<resultId>', ...]::text[], '<copy format'>)
1861  *   AS res (<column definition list>);
1862  *
1863  * The caller can optionally supply a columnAliasList, which is useful for
1864  * CTEs that have column aliases.
1865  *
1866  * If useBinaryCopyFormat is true, then 'binary' format is used. Otherwise,
1867  * 'text' format is used.
1868  */
1869 Query *
BuildReadIntermediateResultsArrayQuery(List * targetEntryList,List * columnAliasList,List * resultIdList,bool useBinaryCopyFormat)1870 BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
1871 									   List *columnAliasList,
1872 									   List *resultIdList,
1873 									   bool useBinaryCopyFormat)
1874 {
1875 	Oid functionOid = CitusReadIntermediateResultArrayFuncId();
1876 
1877 	Const *resultIdConst = makeNode(Const);
1878 	resultIdConst->consttype = TEXTARRAYOID;
1879 	resultIdConst->consttypmod = -1;
1880 	resultIdConst->constlen = -1;
1881 	resultIdConst->constvalue = PointerGetDatum(strlist_to_textarray(resultIdList));
1882 	resultIdConst->constbyval = false;
1883 	resultIdConst->constisnull = false;
1884 	resultIdConst->location = -1;
1885 
1886 	return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList,
1887 											 resultIdConst, functionOid,
1888 											 useBinaryCopyFormat);
1889 }
1890 
1891 
1892 /*
1893  * BuildReadIntermediateResultsQuery is the common code for generating
1894  * queries to read from result files. It is used by
1895  * BuildReadIntermediateResultsArrayQuery and BuildSubPlanResultQuery.
1896  */
1897 static Query *
BuildReadIntermediateResultsQuery(List * targetEntryList,List * columnAliasList,Const * resultIdConst,Oid functionOid,bool useBinaryCopyFormat)1898 BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList,
1899 								  Const *resultIdConst, Oid functionOid,
1900 								  bool useBinaryCopyFormat)
1901 {
1902 	List *funcColNames = NIL;
1903 	List *funcColTypes = NIL;
1904 	List *funcColTypMods = NIL;
1905 	List *funcColCollations = NIL;
1906 	ListCell *targetEntryCell = NULL;
1907 	List *targetList = NIL;
1908 	int columnNumber = 1;
1909 	Oid copyFormatId = BinaryCopyFormatId();
1910 	int columnAliasCount = list_length(columnAliasList);
1911 
1912 	/* build the target list and column definition list */
1913 	foreach(targetEntryCell, targetEntryList)
1914 	{
1915 		TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
1916 		Node *targetExpr = (Node *) targetEntry->expr;
1917 		char *columnName = targetEntry->resname;
1918 		Oid columnType = exprType(targetExpr);
1919 		Oid columnTypMod = exprTypmod(targetExpr);
1920 		Oid columnCollation = exprCollation(targetExpr);
1921 
1922 		if (targetEntry->resjunk)
1923 		{
1924 			continue;
1925 		}
1926 
1927 		funcColNames = lappend(funcColNames, makeString(columnName));
1928 		funcColTypes = lappend_int(funcColTypes, columnType);
1929 		funcColTypMods = lappend_int(funcColTypMods, columnTypMod);
1930 		funcColCollations = lappend_int(funcColCollations, columnCollation);
1931 
1932 		Var *functionColumnVar = makeNode(Var);
1933 		functionColumnVar->varno = 1;
1934 		functionColumnVar->varattno = columnNumber;
1935 		functionColumnVar->vartype = columnType;
1936 		functionColumnVar->vartypmod = columnTypMod;
1937 		functionColumnVar->varcollid = columnCollation;
1938 		functionColumnVar->varlevelsup = 0;
1939 		functionColumnVar->varnosyn = 1;
1940 		functionColumnVar->varattnosyn = columnNumber;
1941 		functionColumnVar->location = -1;
1942 
1943 		TargetEntry *newTargetEntry = makeNode(TargetEntry);
1944 		newTargetEntry->expr = (Expr *) functionColumnVar;
1945 		newTargetEntry->resno = columnNumber;
1946 
1947 		/*
1948 		 * Rename the column only if a column alias is defined.
1949 		 * Notice that column alias count could be less than actual
1950 		 * column count. We only use provided aliases and keep the
1951 		 * original column names if no alias is defined.
1952 		 */
1953 		if (columnAliasCount >= columnNumber)
1954 		{
1955 			Value *columnAlias = (Value *) list_nth(columnAliasList, columnNumber - 1);
1956 			Assert(IsA(columnAlias, String));
1957 			newTargetEntry->resname = strVal(columnAlias);
1958 		}
1959 		else
1960 		{
1961 			newTargetEntry->resname = columnName;
1962 		}
1963 		newTargetEntry->resjunk = false;
1964 
1965 		targetList = lappend(targetList, newTargetEntry);
1966 
1967 		columnNumber++;
1968 	}
1969 
1970 	/* build the citus_copy_format parameter for the call to read_intermediate_result */
1971 	if (!useBinaryCopyFormat)
1972 	{
1973 		copyFormatId = TextCopyFormatId();
1974 	}
1975 
1976 	Const *resultFormatConst = makeNode(Const);
1977 	resultFormatConst->consttype = CitusCopyFormatTypeId();
1978 	resultFormatConst->consttypmod = -1;
1979 	resultFormatConst->constlen = 4;
1980 	resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId);
1981 	resultFormatConst->constbyval = true;
1982 	resultFormatConst->constisnull = false;
1983 	resultFormatConst->location = -1;
1984 
1985 	/* build the call to read_intermediate_result */
1986 	FuncExpr *funcExpr = makeNode(FuncExpr);
1987 	funcExpr->funcid = functionOid;
1988 	funcExpr->funcretset = true;
1989 	funcExpr->funcvariadic = false;
1990 	funcExpr->funcformat = 0;
1991 	funcExpr->funccollid = 0;
1992 	funcExpr->inputcollid = 0;
1993 	funcExpr->location = -1;
1994 	funcExpr->args = list_make2(resultIdConst, resultFormatConst);
1995 
1996 	/* build the RTE for the call to read_intermediate_result */
1997 	RangeTblFunction *rangeTableFunction = makeNode(RangeTblFunction);
1998 	rangeTableFunction->funccolcount = list_length(funcColNames);
1999 	rangeTableFunction->funccolnames = funcColNames;
2000 	rangeTableFunction->funccoltypes = funcColTypes;
2001 	rangeTableFunction->funccoltypmods = funcColTypMods;
2002 	rangeTableFunction->funccolcollations = funcColCollations;
2003 	rangeTableFunction->funcparams = NULL;
2004 	rangeTableFunction->funcexpr = (Node *) funcExpr;
2005 
2006 	Alias *funcAlias = makeNode(Alias);
2007 	funcAlias->aliasname = "intermediate_result";
2008 	funcAlias->colnames = funcColNames;
2009 
2010 	RangeTblEntry *rangeTableEntry = makeNode(RangeTblEntry);
2011 	rangeTableEntry->rtekind = RTE_FUNCTION;
2012 	rangeTableEntry->functions = list_make1(rangeTableFunction);
2013 	rangeTableEntry->inFromCl = true;
2014 	rangeTableEntry->eref = funcAlias;
2015 
2016 	/* build the join tree using the read_intermediate_result RTE */
2017 	RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
2018 	rangeTableRef->rtindex = 1;
2019 
2020 	FromExpr *joinTree = makeNode(FromExpr);
2021 	joinTree->fromlist = list_make1(rangeTableRef);
2022 
2023 	/* build the SELECT query */
2024 	Query *resultQuery = makeNode(Query);
2025 	resultQuery->commandType = CMD_SELECT;
2026 	resultQuery->rtable = list_make1(rangeTableEntry);
2027 	resultQuery->jointree = joinTree;
2028 	resultQuery->targetList = targetList;
2029 
2030 	return resultQuery;
2031 }
2032 
2033 
2034 /*
2035  * GenerateResultId generates the result ID that is used to identify an intermediate
2036  * result of the subplan with the given plan ID and subplan ID.
2037  */
2038 char *
GenerateResultId(uint64 planId,uint32 subPlanId)2039 GenerateResultId(uint64 planId, uint32 subPlanId)
2040 {
2041 	StringInfo resultId = makeStringInfo();
2042 
2043 	appendStringInfo(resultId, UINT64_FORMAT "_%u", planId, subPlanId);
2044 
2045 	return resultId->data;
2046 }
2047 
2048 
2049 /*
2050  * GeneratingSubplans returns true if we are currently in the process of
2051  * generating subplans.
2052  */
2053 bool
GeneratingSubplans(void)2054 GeneratingSubplans(void)
2055 {
2056 	return recursivePlanningDepth > 0;
2057 }
2058