1 /*-------------------------------------------------------------------------
2 *
3 * recursive_planning.c
4 *
5 * Logic for calling the postgres planner recursively for CTEs and
6 * non-pushdownable subqueries in distributed queries.
7 *
8 * PostgreSQL with Citus can execute 4 types of queries:
9 *
10 * - Postgres queries on local tables and functions.
11 *
12 * These queries can use all SQL features, but they may not reference
13 * distributed tables.
14 *
15 * - Router queries that can be executed on a single by node by replacing
16 * table names with shard names.
17 *
18 * These queries can use nearly all SQL features, but only if they have
19 * a single-valued filter on the distribution column.
20 *
21 * - Multi-shard queries that can be executed by performing a task for each
22 * shard in a distributed table and performing a merge step.
23 *
24 * These queries have limited SQL support. They may only include
25 * subqueries if the subquery can be executed on each shard by replacing
26 * table names with shard names and concatenating the result.
27 *
28 * These queries have very limited SQL support and only support basic
29 * inner joins and subqueries without joins.
30 *
31 * To work around the limitations of these planners, we recursively call
32 * the planner for CTEs and unsupported subqueries to obtain a list of
33 * subplans.
34 *
35 * During execution, each subplan is executed separately through the method
36 * that is appropriate for that query. The results are written to temporary
37 * files on the workers. In the original query, the CTEs and subqueries are
38 * replaced by mini-subqueries that read from the temporary files.
39 *
40 * This allows almost all SQL to be directly or indirectly supported,
41 * because if all subqueries that contain distributed tables have been
42 * replaced then what remains is a router query which can use nearly all
43 * SQL features.
44 *
45 * Copyright (c) Citus Data, Inc.
46 *-------------------------------------------------------------------------
47 */
48
49 #include "postgres.h"
50
51 #include "distributed/pg_version_constants.h"
52
53 #include "funcapi.h"
54
55 #include "catalog/pg_type.h"
56 #include "catalog/pg_class.h"
57 #include "distributed/citus_nodes.h"
58 #include "distributed/citus_ruleutils.h"
59 #include "distributed/commands/multi_copy.h"
60 #include "distributed/distributed_planner.h"
61 #include "distributed/errormessage.h"
62 #include "distributed/local_distributed_join_planner.h"
63 #include "distributed/listutils.h"
64 #include "distributed/log_utils.h"
65 #include "distributed/metadata_cache.h"
66 #include "distributed/multi_logical_planner.h"
67 #include "distributed/multi_logical_optimizer.h"
68 #include "distributed/multi_router_planner.h"
69 #include "distributed/multi_physical_planner.h"
70 #include "distributed/multi_server_executor.h"
71 #include "distributed/query_colocation_checker.h"
72 #include "distributed/query_pushdown_planning.h"
73 #include "distributed/recursive_planning.h"
74 #include "distributed/relation_restriction_equivalence.h"
75 #include "distributed/log_utils.h"
76 #include "distributed/shard_pruning.h"
77 #include "distributed/version_compat.h"
78 #include "lib/stringinfo.h"
79 #include "optimizer/clauses.h"
80 #include "optimizer/optimizer.h"
81 #include "optimizer/planner.h"
82 #include "optimizer/prep.h"
83 #include "parser/parsetree.h"
84 #include "nodes/makefuncs.h"
85 #include "nodes/nodeFuncs.h"
86 #include "nodes/nodes.h"
87 #include "nodes/nodeFuncs.h"
88 #include "nodes/pg_list.h"
89 #include "nodes/primnodes.h"
90 #include "nodes/pathnodes.h"
91 #include "utils/builtins.h"
92 #include "utils/guc.h"
93 #include "utils/lsyscache.h"
94
95 /*
96 * RecursivePlanningContext is used to recursively plan subqueries
97 * and CTEs, pull results to the coordinator, and push it back into
98 * the workers.
99 */
100 struct RecursivePlanningContextInternal
101 {
102 int level;
103 uint64 planId;
104 bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
105 List *subPlanList;
106 PlannerRestrictionContext *plannerRestrictionContext;
107 };
108
109 /* track depth of current recursive planner query */
110 static int recursivePlanningDepth = 0;
111
112 /*
113 * CteReferenceWalkerContext is used to collect CTE references in
114 * CteReferenceListWalker.
115 */
116 typedef struct CteReferenceWalkerContext
117 {
118 int level;
119 List *cteReferenceList;
120 } CteReferenceWalkerContext;
121
122 /*
123 * VarLevelsUpWalkerContext is used to find Vars in a (sub)query that
124 * refer to upper levels and therefore cannot be planned separately.
125 */
126 typedef struct VarLevelsUpWalkerContext
127 {
128 int level;
129 } VarLevelsUpWalkerContext;
130
131
132 /* local function forward declarations */
133 static DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query,
134 RecursivePlanningContext *
135 context);
136 static bool ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery,
137 RecursivePlanningContext *
138 context);
139 static bool ContainsSubquery(Query *query);
140 static void RecursivelyPlanNonColocatedSubqueries(Query *subquery,
141 RecursivePlanningContext *context);
142 static void RecursivelyPlanNonColocatedJoinWalker(Node *joinNode,
143 ColocatedJoinChecker *
144 colocatedJoinChecker,
145 RecursivePlanningContext *
146 recursivePlanningContext);
147 static void RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
148 ColocatedJoinChecker *
149 colocatedJoinChecker,
150 RecursivePlanningContext *
151 recursivePlanningContext);
152 static List * SublinkListFromWhere(Query *originalQuery);
153 static bool ExtractSublinkWalker(Node *node, List **sublinkList);
154 static bool ShouldRecursivelyPlanSublinks(Query *query);
155 static bool RecursivelyPlanAllSubqueries(Node *node,
156 RecursivePlanningContext *planningContext);
157 static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query,
158 RecursivePlanningContext *context);
159 static bool RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context);
160 static bool ShouldRecursivelyPlanSubquery(Query *subquery,
161 RecursivePlanningContext *context);
162 static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery,
163 PlannerRestrictionContext *
164 restrictionContext);
165 static bool ShouldRecursivelyPlanSetOperation(Query *query,
166 RecursivePlanningContext *context);
167 static bool RecursivelyPlanSubquery(Query *subquery,
168 RecursivePlanningContext *planningContext);
169 static void RecursivelyPlanSetOperations(Query *query, Node *node,
170 RecursivePlanningContext *context);
171 static bool IsLocalTableRteOrMatView(Node *node);
172 static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId,
173 Query *subPlanQuery);
174 static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context);
175 static bool ContainsReferencesToOuterQueryWalker(Node *node,
176 VarLevelsUpWalkerContext *context);
177 static bool NodeContainsSubqueryReferencingOuterQuery(Node *node);
178 static void WrapFunctionsInSubqueries(Query *query);
179 static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry);
180 static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry);
181 static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
182 List *columnAliasList,
183 Const *resultIdConst, Oid functionOid,
184 bool useBinaryCopyFormat);
185 static void UpdateVarNosInNode(Node *node, Index newVarNo);
186 static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry,
187 List *outerSubqueryTargetList);
188 static List * GenerateRequiredColNamesFromTargetList(List *targetList);
189 static char * GetRelationNameAndAliasName(RangeTblEntry *rangeTablentry);
190
191 /*
192 * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
193 * The function returns the subplans if necessary. For the details of when/how subplans are
194 * generated, see RecursivelyPlanSubqueriesAndCTEs().
195 *
196 * Note that the input originalQuery query is modified if any subplans are generated.
197 */
198 List *
GenerateSubplansForSubqueriesAndCTEs(uint64 planId,Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext)199 GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
200 PlannerRestrictionContext *plannerRestrictionContext)
201 {
202 RecursivePlanningContext context;
203
204 recursivePlanningDepth++;
205
206 /*
207 * Plan subqueries and CTEs that cannot be pushed down by recursively
208 * calling the planner and add the resulting plans to subPlanList.
209 */
210 context.level = 0;
211 context.planId = planId;
212 context.subPlanList = NIL;
213 context.plannerRestrictionContext = plannerRestrictionContext;
214
215 /*
216 * Calculating the distribution key equality upfront is a trade-off for us.
217 *
218 * When the originalQuery contains the distribution key equality, we'd be
219 * able to skip further checks for each lower level subqueries (i.e., if the
220 * all query contains distribution key equality, each subquery also contains
221 * distribution key equality.)
222 *
223 * When the originalQuery doesn't contain the distribution key equality,
224 * calculating this wouldn't help us at all, we should individually check
225 * each each subquery and subquery joins among subqueries.
226 */
227 context.allDistributionKeysInQueryAreEqual =
228 AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);
229
230 DeferredErrorMessage *error = RecursivelyPlanSubqueriesAndCTEs(originalQuery,
231 &context);
232 if (error != NULL)
233 {
234 recursivePlanningDepth--;
235 RaiseDeferredError(error, ERROR);
236 }
237
238 if (context.subPlanList && IsLoggableLevel(DEBUG1))
239 {
240 StringInfo subPlanString = makeStringInfo();
241 pg_get_query_def(originalQuery, subPlanString);
242 ereport(DEBUG1, (errmsg(
243 "Plan " UINT64_FORMAT
244 " query after replacing subqueries and CTEs: %s", planId,
245 ApplyLogRedaction(subPlanString->data))));
246 }
247
248 recursivePlanningDepth--;
249
250 return context.subPlanList;
251 }
252
253
254 /*
255 * RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to
256 * workers directly and instead plans them by recursively calling the planner and
257 * adding the subplan to subPlanList.
258 *
259 * Subplans are executed prior to the distributed plan and the results are written
260 * to temporary files on workers.
261 *
262 * CTE references are replaced by a subquery on the read_intermediate_result
263 * function, which reads from the temporary file.
264 *
265 * If recursive planning results in an error then the error is returned. Otherwise, the
266 * subplans will be added to subPlanList.
267 */
268 static DeferredErrorMessage *
RecursivelyPlanSubqueriesAndCTEs(Query * query,RecursivePlanningContext * context)269 RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context)
270 {
271 DeferredErrorMessage *error = RecursivelyPlanCTEs(query, context);
272 if (error != NULL)
273 {
274 return error;
275 }
276
277 if (SubqueryPushdown)
278 {
279 /*
280 * When the subquery_pushdown flag is enabled we make some hacks
281 * to push down subqueries with LIMIT. Recursive planning would
282 * valiantly do the right thing and try to recursively plan the
283 * inner subqueries, but we don't really want it to because those
284 * subqueries might not be supported and would be much slower.
285 *
286 * Instead, we skip recursive planning altogether when
287 * subquery_pushdown is enabled.
288 */
289 return NULL;
290 }
291
292 /* make sure function calls in joins are executed in the coordinator */
293 WrapFunctionsInSubqueries(query);
294
295 /* descend into subqueries */
296 query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0);
297
298 /*
299 * At this point, all CTEs, leaf subqueries containing local tables and
300 * non-pushdownable subqueries have been replaced. We now check for
301 * combinations of subqueries that cannot be pushed down (e.g.
302 * <subquery on reference table> UNION <subquery on distributed table>).
303 *
304 * This code also runs for the top-level query, which allows us to support
305 * top-level set operations.
306 */
307
308 if (ShouldRecursivelyPlanSetOperation(query, context))
309 {
310 RecursivelyPlanSetOperations(query, (Node *) query->setOperations, context);
311 }
312
313 /*
314 * If the FROM clause is recurring (does not contain a distributed table),
315 * then we cannot have any distributed tables appearing in subqueries in
316 * the SELECT and WHERE clauses.
317 */
318 if (ShouldRecursivelyPlanSublinks(query))
319 {
320 /* replace all subqueries in the WHERE clause */
321 if (query->jointree && query->jointree->quals)
322 {
323 RecursivelyPlanAllSubqueries((Node *) query->jointree->quals, context);
324 }
325
326 /* replace all subqueries in the SELECT clause */
327 RecursivelyPlanAllSubqueries((Node *) query->targetList, context);
328 }
329
330 if (query->havingQual != NULL)
331 {
332 if (NodeContainsSubqueryReferencingOuterQuery(query->havingQual))
333 {
334 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
335 "Subqueries in HAVING cannot refer to outer query",
336 NULL, NULL);
337 }
338
339 RecursivelyPlanAllSubqueries(query->havingQual, context);
340 }
341
342 /*
343 * If the query doesn't have distribution key equality,
344 * recursively plan some of its subqueries.
345 */
346 if (ShouldRecursivelyPlanNonColocatedSubqueries(query, context))
347 {
348 RecursivelyPlanNonColocatedSubqueries(query, context);
349 }
350
351
352 if (ShouldConvertLocalTableJoinsToSubqueries(query->rtable))
353 {
354 /*
355 * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
356 * a query with local table/citus local table and subquery. We convert local/citus local
357 * tables to a subquery until they can be planned.
358 */
359 RecursivelyPlanLocalTableJoins(query, context);
360 }
361
362
363 return NULL;
364 }
365
366
367 /*
368 * GetPlannerRestrictionContext returns the planner restriction context
369 * from the given context.
370 */
371 PlannerRestrictionContext *
GetPlannerRestrictionContext(RecursivePlanningContext * recursivePlanningContext)372 GetPlannerRestrictionContext(RecursivePlanningContext *recursivePlanningContext)
373 {
374 return recursivePlanningContext->plannerRestrictionContext;
375 }
376
377
378 /*
379 * ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains joins
380 * that are not on the distribution key.
381 * *
382 * Note that at the point that this function is called, we've already recursively planned all
383 * the leaf subqueries. Thus, we're actually checking whether the joins among the subqueries
384 * on the distribution key or not.
385 */
386 static bool
ShouldRecursivelyPlanNonColocatedSubqueries(Query * subquery,RecursivePlanningContext * context)387 ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery,
388 RecursivePlanningContext *context)
389 {
390 /*
391 * If the input query already contains the equality, simply return since it is not
392 * possible to find any non colocated subqueries.
393 */
394 if (context->allDistributionKeysInQueryAreEqual)
395 {
396 return false;
397 }
398
399 /*
400 * This check helps us in two ways:
401 * (i) We're not targeting queries that don't include subqueries at all,
402 * they should go through regular planning.
403 * (ii) Lower level subqueries are already recursively planned, so we should
404 * only bother non-colocated subquery joins, which only happens when
405 * there are subqueries.
406 */
407 if (!ContainsSubquery(subquery))
408 {
409 return false;
410 }
411
412 /* direct joins with local tables are not supported by any of Citus planners */
413 if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
414 IsLocalTableRteOrMatView))
415 {
416 return false;
417 }
418
419 /*
420 * Finally, check whether this subquery contains distribution key equality or not.
421 */
422 if (!AllDistributionKeysInSubqueryAreEqual(subquery,
423 context->plannerRestrictionContext))
424 {
425 return true;
426 }
427
428 return false;
429 }
430
431
432 /*
433 * ContainsSubquery returns true if the input query contains any subqueries
434 * in the FROM or WHERE clauses.
435 */
436 static bool
ContainsSubquery(Query * query)437 ContainsSubquery(Query *query)
438 {
439 return JoinTreeContainsSubquery(query) || WhereOrHavingClauseContainsSubquery(query);
440 }
441
442
443 /*
444 * RecursivelyPlanNonColocatedSubqueries gets a query which includes one or more
445 * other subqueries that are not joined on their distribution keys. The function
446 * tries to recursively plan some of the subqueries to make the input query
447 * executable by Citus.
448 *
449 * The function picks an anchor subquery and iterates on the remaining subqueries.
450 * Whenever it finds a non colocated subquery with the anchor subquery, the function
451 * decides to recursively plan the non colocated subquery.
452 *
453 * The function first handles subqueries in FROM clause (i.e., jointree->fromlist) and then
454 * subqueries in WHERE clause (i.e., jointree->quals).
455 *
456 * The function does not treat outer joins seperately. Thus, we might end up with
457 * a query where the function decides to recursively plan an outer side of an outer
458 * join (i.e., LEFT side of LEFT JOIN). For simplicity, we chose to do so and handle
459 * outer joins with a seperate pass on the join tree.
460 */
461 static void
RecursivelyPlanNonColocatedSubqueries(Query * subquery,RecursivePlanningContext * context)462 RecursivelyPlanNonColocatedSubqueries(Query *subquery, RecursivePlanningContext *context)
463 {
464 FromExpr *joinTree = subquery->jointree;
465
466 /* create the context for the non colocated subquery planning */
467 PlannerRestrictionContext *restrictionContext = context->plannerRestrictionContext;
468 ColocatedJoinChecker colocatedJoinChecker = CreateColocatedJoinChecker(subquery,
469 restrictionContext);
470
471 /*
472 * Although this is a rare case, we weren't able to pick an anchor
473 * range table entry, so we cannot continue.
474 */
475 if (colocatedJoinChecker.anchorRelationRestrictionList == NIL)
476 {
477 return;
478 }
479
480 /* handle from clause subqueries first */
481 RecursivelyPlanNonColocatedJoinWalker((Node *) joinTree, &colocatedJoinChecker,
482 context);
483
484 /* handle subqueries in WHERE clause */
485 RecursivelyPlanNonColocatedSubqueriesInWhere(subquery, &colocatedJoinChecker,
486 context);
487 }
488
489
490 /*
491 * RecursivelyPlanNonColocatedJoinWalker gets a join node and walks over it to find
492 * subqueries that live under the node.
493 *
494 * When a subquery found, it's checked whether the subquery is colocated with the
495 * anchor subquery specified in the nonColocatedJoinContext. If not,
496 * the subquery is recursively planned.
497 */
498 static void
RecursivelyPlanNonColocatedJoinWalker(Node * joinNode,ColocatedJoinChecker * colocatedJoinChecker,RecursivePlanningContext * recursivePlanningContext)499 RecursivelyPlanNonColocatedJoinWalker(Node *joinNode,
500 ColocatedJoinChecker *colocatedJoinChecker,
501 RecursivePlanningContext *recursivePlanningContext)
502 {
503 if (joinNode == NULL)
504 {
505 return;
506 }
507 else if (IsA(joinNode, FromExpr))
508 {
509 FromExpr *fromExpr = (FromExpr *) joinNode;
510 ListCell *fromExprCell;
511
512 /*
513 * For each element of the from list, check whether the element is
514 * colocated with the anchor subquery by recursing until we
515 * find the subqueries.
516 */
517 foreach(fromExprCell, fromExpr->fromlist)
518 {
519 Node *fromElement = (Node *) lfirst(fromExprCell);
520
521 RecursivelyPlanNonColocatedJoinWalker(fromElement, colocatedJoinChecker,
522 recursivePlanningContext);
523 }
524 }
525 else if (IsA(joinNode, JoinExpr))
526 {
527 JoinExpr *joinExpr = (JoinExpr *) joinNode;
528
529 /* recurse into the left subtree */
530 RecursivelyPlanNonColocatedJoinWalker(joinExpr->larg, colocatedJoinChecker,
531 recursivePlanningContext);
532
533 /* recurse into the right subtree */
534 RecursivelyPlanNonColocatedJoinWalker(joinExpr->rarg, colocatedJoinChecker,
535 recursivePlanningContext);
536 }
537 else if (IsA(joinNode, RangeTblRef))
538 {
539 int rangeTableIndex = ((RangeTblRef *) joinNode)->rtindex;
540 List *rangeTableList = colocatedJoinChecker->subquery->rtable;
541 RangeTblEntry *rte = rt_fetch(rangeTableIndex, rangeTableList);
542
543 /* we're only interested in subqueries for now */
544 if (rte->rtekind != RTE_SUBQUERY)
545 {
546 return;
547 }
548
549 /*
550 * If the subquery is not colocated with the anchor subquery,
551 * recursively plan it.
552 */
553 Query *subquery = rte->subquery;
554 if (!SubqueryColocated(subquery, colocatedJoinChecker))
555 {
556 RecursivelyPlanSubquery(subquery, recursivePlanningContext);
557 }
558 }
559 else
560 {
561 pg_unreachable();
562 }
563 }
564
565
566 /*
567 * RecursivelyPlanNonColocatedJoinWalker gets a query and walks over its sublinks
568 * to find subqueries that live in WHERE clause.
569 *
570 * When a subquery found, it's checked whether the subquery is colocated with the
571 * anchor subquery specified in the nonColocatedJoinContext. If not,
572 * the subquery is recursively planned.
573 */
574 static void
RecursivelyPlanNonColocatedSubqueriesInWhere(Query * query,ColocatedJoinChecker * colocatedJoinChecker,RecursivePlanningContext * recursivePlanningContext)575 RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
576 ColocatedJoinChecker *colocatedJoinChecker,
577 RecursivePlanningContext *
578 recursivePlanningContext)
579 {
580 List *sublinkList = SublinkListFromWhere(query);
581 ListCell *sublinkCell = NULL;
582
583 foreach(sublinkCell, sublinkList)
584 {
585 SubLink *sublink = (SubLink *) lfirst(sublinkCell);
586 Query *subselect = (Query *) sublink->subselect;
587
588 /* subselect is probably never NULL, but anyway lets keep the check */
589 if (subselect == NULL)
590 {
591 continue;
592 }
593
594 if (!SubqueryColocated(subselect, colocatedJoinChecker))
595 {
596 RecursivelyPlanSubquery(subselect, recursivePlanningContext);
597 }
598 }
599 }
600
601
602 /*
603 * SublinkListFromWhere finds the subquery nodes in the where clause of the given query. Note
604 * that the function should be called on the original query given that postgres
605 * standard_planner() may convert the subqueries in WHERE clause to joins.
606 */
607 static List *
SublinkListFromWhere(Query * originalQuery)608 SublinkListFromWhere(Query *originalQuery)
609 {
610 FromExpr *joinTree = originalQuery->jointree;
611 List *sublinkList = NIL;
612
613 if (!joinTree)
614 {
615 return NIL;
616 }
617
618 Node *queryQuals = joinTree->quals;
619 ExtractSublinkWalker(queryQuals, &sublinkList);
620
621 return sublinkList;
622 }
623
624
625 /*
626 * ExtractSublinkWalker walks over a quals node, and finds all sublinks
627 * in that node.
628 */
629 static bool
ExtractSublinkWalker(Node * node,List ** sublinkList)630 ExtractSublinkWalker(Node *node, List **sublinkList)
631 {
632 bool walkerResult = false;
633 if (node == NULL)
634 {
635 return false;
636 }
637
638 if (IsA(node, SubLink))
639 {
640 (*sublinkList) = lappend(*sublinkList, node);
641 }
642 else
643 {
644 walkerResult = expression_tree_walker(node, ExtractSublinkWalker,
645 sublinkList);
646 }
647
648 return walkerResult;
649 }
650
651
652 /*
653 * ShouldRecursivelyPlanSublinks returns true if the query has a recurring
654 * FROM clause.
655 */
656 static bool
ShouldRecursivelyPlanSublinks(Query * query)657 ShouldRecursivelyPlanSublinks(Query *query)
658 {
659 if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable,
660 IsDistributedTableRTE))
661 {
662 /* there is a distributed table in the FROM clause */
663 return false;
664 }
665
666 return true;
667 }
668
669
670 /*
671 * RecursivelyPlanAllSubqueries descends into an expression tree and recursively
672 * plans all subqueries that contain at least one distributed table. The recursive
673 * planning starts from the top of the input query.
674 */
675 static bool
RecursivelyPlanAllSubqueries(Node * node,RecursivePlanningContext * planningContext)676 RecursivelyPlanAllSubqueries(Node *node, RecursivePlanningContext *planningContext)
677 {
678 if (node == NULL)
679 {
680 return false;
681 }
682
683 if (IsA(node, Query))
684 {
685 Query *query = (Query *) node;
686 if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable, IsCitusTableRTE))
687 {
688 RecursivelyPlanSubquery(query, planningContext);
689 }
690
691 return false;
692 }
693
694 return expression_tree_walker(node, RecursivelyPlanAllSubqueries, planningContext);
695 }
696
697
698 /*
699 * RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner
700 * The resulting plan is added to planningContext->subPlanList and CTE references
701 * are replaced by subqueries that call read_intermediate_result, which reads the
702 * intermediate result of the CTE after it is executed.
703 *
704 * Recursive and modifying CTEs are not yet supported and return an error.
705 */
706 static DeferredErrorMessage *
RecursivelyPlanCTEs(Query * query,RecursivePlanningContext * planningContext)707 RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
708 {
709 ListCell *cteCell = NULL;
710 CteReferenceWalkerContext context = { -1, NIL };
711
712 if (query->cteList == NIL)
713 {
714 /* no CTEs, nothing to do */
715 return NULL;
716 }
717
718 if (query->hasRecursive)
719 {
720 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
721 "recursive CTEs are not supported in distributed "
722 "queries",
723 NULL, NULL);
724 }
725
726 /* get all RTE_CTEs that point to CTEs from cteList */
727 CteReferenceListWalker((Node *) query, &context);
728
729 foreach(cteCell, query->cteList)
730 {
731 CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell);
732 char *cteName = cte->ctename;
733 Query *subquery = (Query *) cte->ctequery;
734 uint64 planId = planningContext->planId;
735 List *cteTargetList = NIL;
736 ListCell *rteCell = NULL;
737 int replacedCtesCount = 0;
738
739 if (ContainsReferencesToOuterQuery(subquery))
740 {
741 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
742 "CTEs that refer to other subqueries are not "
743 "supported in multi-shard queries",
744 NULL, NULL);
745 }
746
747 if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT)
748 {
749 /*
750 * SELECT CTEs that aren't referenced aren't executed in postgres.
751 * We don't need to generate a subplan for it and can take the rest
752 * of this iteration off.
753 */
754 continue;
755 }
756
757 uint32 subPlanId = list_length(planningContext->subPlanList) + 1;
758
759 if (IsLoggableLevel(DEBUG1))
760 {
761 StringInfo subPlanString = makeStringInfo();
762 pg_get_query_def(subquery, subPlanString);
763 ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
764 "_%u for CTE %s: %s", planId, subPlanId,
765 cteName,
766 ApplyLogRedaction(subPlanString->data))));
767 }
768
769 /* build a sub plan for the CTE */
770 DistributedSubPlan *subPlan = CreateDistributedSubPlan(subPlanId, subquery);
771 planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);
772
773 /* build the result_id parameter for the call to read_intermediate_result */
774 char *resultId = GenerateResultId(planId, subPlanId);
775
776 if (subquery->returningList)
777 {
778 /* modifying CTE with returning */
779 cteTargetList = subquery->returningList;
780 }
781 else
782 {
783 /* regular SELECT CTE */
784 cteTargetList = subquery->targetList;
785 }
786
787 /* replace references to the CTE with a subquery that reads results */
788 Query *resultQuery = BuildSubPlanResultQuery(cteTargetList, cte->aliascolnames,
789 resultId);
790
791 foreach(rteCell, context.cteReferenceList)
792 {
793 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rteCell);
794
795 if (rangeTableEntry->rtekind != RTE_CTE)
796 {
797 /*
798 * This RTE pointed to a preceding CTE that was already replaced
799 * by a subplan.
800 */
801 continue;
802 }
803
804 if (strncmp(rangeTableEntry->ctename, cteName, NAMEDATALEN) == 0)
805 {
806 /* change the RTE_CTE into an RTE_SUBQUERY */
807 rangeTableEntry->rtekind = RTE_SUBQUERY;
808 rangeTableEntry->ctename = NULL;
809 rangeTableEntry->ctelevelsup = 0;
810
811 if (replacedCtesCount == 0)
812 {
813 /*
814 * Replace the first CTE reference with the result query directly.
815 */
816 rangeTableEntry->subquery = resultQuery;
817 }
818 else
819 {
820 /*
821 * Replace subsequent CTE references with a copy of the result
822 * query.
823 */
824 rangeTableEntry->subquery = copyObject(resultQuery);
825 }
826
827 replacedCtesCount++;
828 }
829 }
830
831 Assert(cte->cterefcount == replacedCtesCount);
832 }
833
834 /*
835 * All CTEs are now executed through subplans and RTE_CTEs pointing
836 * to the CTE list have been replaced with subqueries. We can now
837 * clear the cteList.
838 */
839 query->cteList = NIL;
840
841 return NULL;
842 }
843
844
845 /*
846 * RecursivelyPlanSubqueryWalker recursively finds all the Query nodes and
847 * recursively plans if necessary.
848 */
849 static bool
RecursivelyPlanSubqueryWalker(Node * node,RecursivePlanningContext * context)850 RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context)
851 {
852 if (node == NULL)
853 {
854 return false;
855 }
856
857 if (IsA(node, Query))
858 {
859 Query *query = (Query *) node;
860
861 context->level += 1;
862
863 /*
864 * First, make sure any subqueries and CTEs within this subquery
865 * are recursively planned if necessary.
866 */
867 DeferredErrorMessage *error = RecursivelyPlanSubqueriesAndCTEs(query, context);
868 if (error != NULL)
869 {
870 RaiseDeferredError(error, ERROR);
871 }
872 context->level -= 1;
873
874 /*
875 * Recursively plan this subquery if it cannot be pushed down and is
876 * eligible for recursive planning.
877 */
878 if (ShouldRecursivelyPlanSubquery(query, context))
879 {
880 RecursivelyPlanSubquery(query, context);
881 }
882
883 /* we're done, no need to recurse anymore for this query */
884 return false;
885 }
886
887 return expression_tree_walker(node, RecursivelyPlanSubqueryWalker, context);
888 }
889
890
891 /*
892 * ShouldRecursivelyPlanSubquery decides whether the input subquery should be recursively
893 * planned or not.
894 *
895 * For the details, see the cases in the function.
896 */
897 static bool
ShouldRecursivelyPlanSubquery(Query * subquery,RecursivePlanningContext * context)898 ShouldRecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *context)
899 {
900 if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
901 IsLocalTableRteOrMatView))
902 {
903 /*
904 * Postgres can always plan queries that don't require distributed planning.
905 * Note that we need to check this first, otherwise the calls to the many other
906 * Citus planner functions would error our due to local relations.
907 *
908 * TODO: We could only successfully create distributed plans with local tables
909 * when the local tables are on the leaf queries and the upper level queries
910 * do not contain any other local tables.
911 */
912 }
913 else if (CanPushdownSubquery(subquery, false))
914 {
915 /*
916 * We should do one more check for the distribution key equality.
917 *
918 * If the input query to the planner doesn't contain distribution key equality,
919 * we should further check whether this individual subquery contains or not.
920 *
921 * If all relations are not joined on their distribution keys for the given
922 * subquery, we cannot push push it down and therefore we should try to
923 * recursively plan it.
924 */
925 if (!context->allDistributionKeysInQueryAreEqual &&
926 !AllDistributionKeysInSubqueryAreEqual(subquery,
927 context->plannerRestrictionContext))
928 {
929 return true;
930 }
931
932 /*
933 * Citus can pushdown this subquery, no need to recursively
934 * plan which is much more expensive than pushdown.
935 */
936 return false;
937 }
938
939 return true;
940 }
941
942
943 /*
944 * AllDistributionKeysInSubqueryAreEqual is a wrapper function
945 * for AllDistributionKeysInQueryAreEqual(). Here, we filter the
946 * planner restrictions for the given subquery and do the restriction
947 * equality checks on the filtered restriction.
948 */
949 static bool
AllDistributionKeysInSubqueryAreEqual(Query * subquery,PlannerRestrictionContext * restrictionContext)950 AllDistributionKeysInSubqueryAreEqual(Query *subquery,
951 PlannerRestrictionContext *restrictionContext)
952 {
953 /* we don't support distribution eq. checks for CTEs yet */
954 if (subquery->cteList != NIL)
955 {
956 return false;
957 }
958
959 PlannerRestrictionContext *filteredRestrictionContext =
960 FilterPlannerRestrictionForQuery(restrictionContext, subquery);
961
962 bool allDistributionKeysInSubqueryAreEqual =
963 AllDistributionKeysInQueryAreEqual(subquery, filteredRestrictionContext);
964 if (!allDistributionKeysInSubqueryAreEqual)
965 {
966 return false;
967 }
968
969 return true;
970 }
971
972
973 /*
974 * ShouldRecursivelyPlanSetOperation determines whether the leaf queries of a
975 * set operations tree need to be recursively planned in order to support the
976 * query as a whole.
977 */
978 static bool
ShouldRecursivelyPlanSetOperation(Query * query,RecursivePlanningContext * context)979 ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context)
980 {
981 SetOperationStmt *setOperations = (SetOperationStmt *) query->setOperations;
982 if (setOperations == NULL)
983 {
984 return false;
985 }
986
987 if (context->level == 0)
988 {
989 /*
990 * We cannot push down top-level set operation. Recursively plan the
991 * leaf nodes such that it becomes a router query.
992 */
993 return true;
994 }
995
996 if (setOperations->op != SETOP_UNION)
997 {
998 /*
999 * We can only push down UNION operaionts, plan other set operations
1000 * recursively.
1001 */
1002 return true;
1003 }
1004
1005 if (DeferErrorIfUnsupportedUnionQuery(query) != NULL)
1006 {
1007 /*
1008 * If at least one leaf query in the union is recurring, then all
1009 * leaf nodes need to be recurring.
1010 */
1011 return true;
1012 }
1013
1014 PlannerRestrictionContext *filteredRestrictionContext =
1015 FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
1016 if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext))
1017 {
1018 /*
1019 * The distribution column is not in the same place in all sides
1020 * of the union, meaning we cannot determine distribution column
1021 * equivalence. Recursive planning is necessary.
1022 */
1023 return true;
1024 }
1025
1026 return false;
1027 }
1028
1029
1030 /*
1031 * RecursivelyPlanSetOperations descends into a tree of set operations
1032 * (e.g. UNION, INTERSECTS) and recursively plans all leaf nodes that
1033 * contain distributed tables.
1034 */
1035 static void
RecursivelyPlanSetOperations(Query * query,Node * node,RecursivePlanningContext * context)1036 RecursivelyPlanSetOperations(Query *query, Node *node,
1037 RecursivePlanningContext *context)
1038 {
1039 if (IsA(node, SetOperationStmt))
1040 {
1041 SetOperationStmt *setOperations = (SetOperationStmt *) node;
1042
1043 RecursivelyPlanSetOperations(query, setOperations->larg, context);
1044 RecursivelyPlanSetOperations(query, setOperations->rarg, context);
1045 }
1046 else if (IsA(node, RangeTblRef))
1047 {
1048 RangeTblRef *rangeTableRef = (RangeTblRef *) node;
1049 RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableRef->rtindex,
1050 query->rtable);
1051 Query *subquery = rangeTableEntry->subquery;
1052
1053 if (rangeTableEntry->rtekind == RTE_SUBQUERY &&
1054 FindNodeMatchingCheckFunction((Node *) subquery, IsDistributedTableRTE))
1055 {
1056 RecursivelyPlanSubquery(subquery, context);
1057 }
1058 }
1059 else
1060 {
1061 ereport(ERROR, (errmsg("unexpected node type (%d) while "
1062 "expecting set operations or "
1063 "range table references", nodeTag(node))));
1064 }
1065 }
1066
1067
1068 /*
1069 * IsLocalTableRteOrMatView gets a node and returns true if the node is a range
1070 * table entry that points to a postgres local or citus local table or to a
1071 * materialized view.
1072 */
1073 static bool
IsLocalTableRteOrMatView(Node * node)1074 IsLocalTableRteOrMatView(Node *node)
1075 {
1076 if (node == NULL)
1077 {
1078 return false;
1079 }
1080
1081 if (!IsA(node, RangeTblEntry))
1082 {
1083 return false;
1084 }
1085
1086 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
1087 if (rangeTableEntry->rtekind != RTE_RELATION)
1088 {
1089 return false;
1090 }
1091
1092 if (rangeTableEntry->relkind == RELKIND_VIEW)
1093 {
1094 return false;
1095 }
1096
1097 Oid relationId = rangeTableEntry->relid;
1098 return IsRelationLocalTableOrMatView(relationId);
1099 }
1100
1101
1102 /*
1103 * IsRelationLocalTableOrMatView returns true if the given relation
1104 * is a citus local, local, or materialized view.
1105 */
1106 bool
IsRelationLocalTableOrMatView(Oid relationId)1107 IsRelationLocalTableOrMatView(Oid relationId)
1108 {
1109 if (!IsCitusTable(relationId))
1110 {
1111 /* postgres local table or a materialized view */
1112 return true;
1113 }
1114 else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
1115 {
1116 return true;
1117 }
1118
1119 /* no local table found */
1120 return false;
1121 }
1122
1123
1124 /*
1125 * RecursivelyPlanQuery recursively plans a query, replaces it with a
1126 * result query and returns the subplan.
1127 *
1128 * Before we recursively plan the given subquery, we should ensure
1129 * that the subquery doesn't contain any references to the outer
1130 * queries (i.e., such queries cannot be separately planned). In
1131 * that case, the function doesn't recursively plan the input query
1132 * and immediately returns. Later, the planner decides on what to do
1133 * with the query.
1134 */
1135 static bool
RecursivelyPlanSubquery(Query * subquery,RecursivePlanningContext * planningContext)1136 RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext)
1137 {
1138 uint64 planId = planningContext->planId;
1139 Query *debugQuery = NULL;
1140
1141 if (ContainsReferencesToOuterQuery(subquery))
1142 {
1143 elog(DEBUG2, "skipping recursive planning for the subquery since it "
1144 "contains references to outer queries");
1145
1146 return false;
1147 }
1148
1149 /*
1150 * Subquery will go through the standard planner, thus to properly deparse it
1151 * we keep its copy: debugQuery.
1152 */
1153 if (IsLoggableLevel(DEBUG1))
1154 {
1155 debugQuery = copyObject(subquery);
1156 }
1157
1158
1159 /*
1160 * Create the subplan and append it to the list in the planning context.
1161 */
1162 int subPlanId = list_length(planningContext->subPlanList) + 1;
1163
1164 DistributedSubPlan *subPlan = CreateDistributedSubPlan(subPlanId, subquery);
1165 planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);
1166
1167 /* build the result_id parameter for the call to read_intermediate_result */
1168 char *resultId = GenerateResultId(planId, subPlanId);
1169
1170 /*
1171 * BuildSubPlanResultQuery() can optionally use provided column aliases.
1172 * We do not need to send additional alias list for subqueries.
1173 */
1174 Query *resultQuery = BuildSubPlanResultQuery(subquery->targetList, NIL, resultId);
1175
1176 if (IsLoggableLevel(DEBUG1))
1177 {
1178 StringInfo subqueryString = makeStringInfo();
1179
1180 pg_get_query_def(debugQuery, subqueryString);
1181
1182 ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
1183 "_%u for subquery %s", planId, subPlanId,
1184 ApplyLogRedaction(subqueryString->data))));
1185 }
1186
1187 /* finally update the input subquery to point the result query */
1188 *subquery = *resultQuery;
1189 return true;
1190 }
1191
1192
1193 /*
1194 * CreateDistributedSubPlan creates a distributed subplan by recursively calling
1195 * the planner from the top, which may either generate a local plan or another
1196 * distributed plan, which can itself contain subplans.
1197 */
1198 static DistributedSubPlan *
CreateDistributedSubPlan(uint32 subPlanId,Query * subPlanQuery)1199 CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery)
1200 {
1201 int cursorOptions = 0;
1202
1203 if (ContainsReadIntermediateResultFunction((Node *) subPlanQuery))
1204 {
1205 /*
1206 * Make sure we go through distributed planning if there are
1207 * read_intermediate_result calls, even if there are no distributed
1208 * tables in the query anymore.
1209 *
1210 * We cannot perform this check in the planner itself, since that
1211 * would also cause the workers to attempt distributed planning.
1212 */
1213 cursorOptions |= CURSOR_OPT_FORCE_DISTRIBUTED;
1214 }
1215
1216 DistributedSubPlan *subPlan = CitusMakeNode(DistributedSubPlan);
1217 subPlan->plan = planner_compat(subPlanQuery, cursorOptions, NULL);
1218 subPlan->subPlanId = subPlanId;
1219
1220 return subPlan;
1221 }
1222
1223
1224 /*
1225 * CteReferenceListWalker finds all references to CTEs in the top level of a query
1226 * and adds them to context->cteReferenceList.
1227 */
1228 static bool
CteReferenceListWalker(Node * node,CteReferenceWalkerContext * context)1229 CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context)
1230 {
1231 if (node == NULL)
1232 {
1233 return false;
1234 }
1235
1236 if (IsA(node, RangeTblEntry))
1237 {
1238 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
1239
1240 if (rangeTableEntry->rtekind == RTE_CTE &&
1241 rangeTableEntry->ctelevelsup == context->level)
1242 {
1243 context->cteReferenceList = lappend(context->cteReferenceList,
1244 rangeTableEntry);
1245 }
1246
1247 /* caller will descend into range table entry */
1248 return false;
1249 }
1250 else if (IsA(node, Query))
1251 {
1252 Query *query = (Query *) node;
1253
1254 context->level += 1;
1255 query_tree_walker(query, CteReferenceListWalker, context,
1256 QTW_EXAMINE_RTES_BEFORE);
1257 context->level -= 1;
1258
1259 return false;
1260 }
1261 else
1262 {
1263 return expression_tree_walker(node, CteReferenceListWalker, context);
1264 }
1265 }
1266
1267
1268 /*
1269 * ContainsReferencesToOuterQuery determines whether the given query contains
1270 * anything that points outside of the query itself. Such queries cannot be
1271 * planned recursively.
1272 */
1273 bool
ContainsReferencesToOuterQuery(Query * query)1274 ContainsReferencesToOuterQuery(Query *query)
1275 {
1276 VarLevelsUpWalkerContext context = { 0 };
1277 int flags = 0;
1278
1279 return query_tree_walker(query, ContainsReferencesToOuterQueryWalker,
1280 &context, flags);
1281 }
1282
1283
1284 /*
1285 * ContainsReferencesToOuterQueryWalker determines whether the given query
1286 * contains any Vars that point more than context->level levels up.
1287 *
1288 * ContainsReferencesToOuterQueryWalker recursively descends into subqueries
1289 * and increases the level by 1 before recursing.
1290 */
1291 static bool
ContainsReferencesToOuterQueryWalker(Node * node,VarLevelsUpWalkerContext * context)1292 ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context)
1293 {
1294 if (node == NULL)
1295 {
1296 return false;
1297 }
1298
1299 if (IsA(node, Var))
1300 {
1301 if (((Var *) node)->varlevelsup > context->level)
1302 {
1303 return true;
1304 }
1305
1306 return false;
1307 }
1308 else if (IsA(node, Aggref))
1309 {
1310 if (((Aggref *) node)->agglevelsup > context->level)
1311 {
1312 return true;
1313 }
1314 }
1315 else if (IsA(node, GroupingFunc))
1316 {
1317 if (((GroupingFunc *) node)->agglevelsup > context->level)
1318 {
1319 return true;
1320 }
1321
1322 return false;
1323 }
1324 else if (IsA(node, PlaceHolderVar))
1325 {
1326 if (((PlaceHolderVar *) node)->phlevelsup > context->level)
1327 {
1328 return true;
1329 }
1330 }
1331 else if (IsA(node, Query))
1332 {
1333 Query *query = (Query *) node;
1334 int flags = 0;
1335
1336 context->level += 1;
1337 bool found = query_tree_walker(query, ContainsReferencesToOuterQueryWalker,
1338 context, flags);
1339 context->level -= 1;
1340
1341 return found;
1342 }
1343
1344 return expression_tree_walker(node, ContainsReferencesToOuterQueryWalker,
1345 context);
1346 }
1347
1348
1349 /*
1350 * NodeContainsSubqueryReferencingOuterQuery determines whether the given node
1351 * contains anything that points outside of the query itself.
1352 */
1353 static bool
NodeContainsSubqueryReferencingOuterQuery(Node * node)1354 NodeContainsSubqueryReferencingOuterQuery(Node *node)
1355 {
1356 List *sublinks = NIL;
1357 ExtractSublinkWalker(node, &sublinks);
1358
1359 SubLink *sublink;
1360 foreach_ptr(sublink, sublinks)
1361 {
1362 if (ContainsReferencesToOuterQuery(castNode(Query, sublink->subselect)))
1363 {
1364 return true;
1365 }
1366 }
1367
1368 return false;
1369 }
1370
1371
1372 /*
1373 * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
1374 * with a subquery. The function also pushes down the filters to the subquery.
1375 *
1376 * It then recursively plans the subquery. This subquery is wrapped with another subquery
1377 * as a trick to reduce network cost, because we currently don't have an easy way to
1378 * skip generating NULL's for non-required columns, and if we create (SELECT a, NULL, NULL FROM table)
1379 * then this will be sent over network and NULL's also occupy some space. Instead of this we generate:
1380 * (SELECT t.a, NULL, NULL FROM (SELECT a FROM table) t). The inner subquery will be recursively planned
1381 * but the outer part will not be yet it will still have the NULL columns so that the query is correct.
1382 */
1383 void
ReplaceRTERelationWithRteSubquery(RangeTblEntry * rangeTableEntry,List * requiredAttrNumbers,RecursivePlanningContext * context)1384 ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
1385 List *requiredAttrNumbers,
1386 RecursivePlanningContext *context)
1387 {
1388 Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers);
1389 List *outerQueryTargetList = CreateAllTargetListForRelation(rangeTableEntry->relid,
1390 requiredAttrNumbers);
1391
1392 List *restrictionList =
1393 GetRestrictInfoListForRelation(rangeTableEntry,
1394 context->plannerRestrictionContext);
1395 List *copyRestrictionList = copyObject(restrictionList);
1396 Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList);
1397 subquery->jointree->quals = (Node *) andedBoundExpressions;
1398
1399 /*
1400 * Originally the quals were pointing to the RTE and its varno
1401 * was pointing to its index in rtable. However now we converted the RTE
1402 * to a subquery and the quals should be pointing to that subquery, which
1403 * is the only RTE in its rtable, hence we update the varnos so that they
1404 * point to the subquery RTE.
1405 * Originally: rtable: [rte1, current_rte, rte3...]
1406 * Now: rtable: [rte1, subquery[current_rte], rte3...] --subquery[current_rte] refers to its rtable.
1407 */
1408 Node *quals = subquery->jointree->quals;
1409 UpdateVarNosInNode(quals, SINGLE_RTE_INDEX);
1410
1411 /* replace the function with the constructed subquery */
1412 rangeTableEntry->rtekind = RTE_SUBQUERY;
1413 rangeTableEntry->subquery = subquery;
1414
1415 /*
1416 * If the relation is inherited, it'll still be inherited as
1417 * we've copied it earlier. This is to prevent the newly created
1418 * subquery being treated as inherited.
1419 */
1420 rangeTableEntry->inh = false;
1421
1422 if (IsLoggableLevel(DEBUG1))
1423 {
1424 char *relationAndAliasName = GetRelationNameAndAliasName(rangeTableEntry);
1425 ereport(DEBUG1, (errmsg("Wrapping relation %s to a subquery",
1426 relationAndAliasName)));
1427 }
1428
1429 /* as we created the subquery, now forcefully recursively plan it */
1430 bool recursivelyPlanned = RecursivelyPlanSubquery(subquery, context);
1431 if (!recursivelyPlanned)
1432 {
1433 ereport(ERROR, (errmsg(
1434 "unexpected state: query should have been recursively planned")));
1435 }
1436
1437 Query *outerSubquery = CreateOuterSubquery(rangeTableEntry, outerQueryTargetList);
1438 rangeTableEntry->subquery = outerSubquery;
1439 }
1440
1441
1442 /*
1443 * GetRelationNameAndAliasName returns the relname + alias name if
1444 * alias name exists otherwise only the relname is returned.
1445 */
1446 static char *
GetRelationNameAndAliasName(RangeTblEntry * rangeTableEntry)1447 GetRelationNameAndAliasName(RangeTblEntry *rangeTableEntry)
1448 {
1449 StringInfo str = makeStringInfo();
1450 appendStringInfo(str, "\"%s\"", get_rel_name(rangeTableEntry->relid));
1451
1452 char *aliasName = NULL;
1453 if (rangeTableEntry->alias)
1454 {
1455 aliasName = rangeTableEntry->alias->aliasname;
1456 }
1457
1458 if (aliasName)
1459 {
1460 appendStringInfo(str, " \"%s\"", aliasName);
1461 }
1462 return str->data;
1463 }
1464
1465
1466 /*
1467 * CreateOuterSubquery creates outer subquery which contains
1468 * the given range table entry in its rtable.
1469 */
1470 static Query *
CreateOuterSubquery(RangeTblEntry * rangeTableEntry,List * outerSubqueryTargetList)1471 CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList)
1472 {
1473 List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList(
1474 outerSubqueryTargetList);
1475
1476 Query *outerSubquery = makeNode(Query);
1477 outerSubquery->commandType = CMD_SELECT;
1478
1479 /* we copy the input rteRelation to preserve the rteIdentity */
1480 RangeTblEntry *innerSubqueryRTE = copyObject(rangeTableEntry);
1481
1482 innerSubqueryRTE->eref->colnames = innerSubqueryColNames;
1483 outerSubquery->rtable = list_make1(innerSubqueryRTE);
1484
1485 /* set the FROM expression to the subquery */
1486 RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
1487 newRangeTableRef->rtindex = 1;
1488 outerSubquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
1489
1490 outerSubquery->targetList = outerSubqueryTargetList;
1491 return outerSubquery;
1492 }
1493
1494
1495 /*
1496 * GenerateRequiredColNamesFromTargetList generates the required colnames
1497 * from the given target list.
1498 */
1499 static List *
GenerateRequiredColNamesFromTargetList(List * targetList)1500 GenerateRequiredColNamesFromTargetList(List *targetList)
1501 {
1502 TargetEntry *entry = NULL;
1503 List *innerSubqueryColNames = NIL;
1504 foreach_ptr(entry, targetList)
1505 {
1506 if (IsA(entry->expr, Var))
1507 {
1508 /*
1509 * column names of the inner subquery should only contain the
1510 * required columns, as in if we choose 'b' from ('a','b') colnames
1511 * should be 'a' not ('a','b')
1512 */
1513 innerSubqueryColNames = lappend(innerSubqueryColNames, makeString(
1514 entry->resname));
1515 }
1516 }
1517 return innerSubqueryColNames;
1518 }
1519
1520
1521 /*
1522 * UpdateVarNosInNode iterates the Vars in the
1523 * given node and updates the varno's as the newVarNo.
1524 */
1525 static void
UpdateVarNosInNode(Node * node,Index newVarNo)1526 UpdateVarNosInNode(Node *node, Index newVarNo)
1527 {
1528 List *varList = pull_var_clause(node, PVC_RECURSE_AGGREGATES |
1529 PVC_RECURSE_PLACEHOLDERS);
1530 Var *var = NULL;
1531 foreach_ptr(var, varList)
1532 {
1533 var->varno = newVarNo;
1534 }
1535 }
1536
1537
1538 /*
1539 * IsRecursivelyPlannableRelation returns true if the given range table entry
1540 * is a relation type that can be converted to a subquery.
1541 */
1542 bool
IsRecursivelyPlannableRelation(RangeTblEntry * rangeTableEntry)1543 IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry)
1544 {
1545 if (rangeTableEntry->rtekind != RTE_RELATION)
1546 {
1547 return false;
1548 }
1549 return rangeTableEntry->relkind == RELKIND_PARTITIONED_TABLE ||
1550 rangeTableEntry->relkind == RELKIND_RELATION ||
1551 rangeTableEntry->relkind == RELKIND_MATVIEW ||
1552 rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE;
1553 }
1554
1555
1556 /*
1557 * ContainsLocalTableDistributedTableJoin returns true if the input range table list
1558 * contains a direct join between local RTE and an RTE that contains a distributed
1559 * or reference table.
1560 */
1561 bool
ContainsLocalTableDistributedTableJoin(List * rangeTableList)1562 ContainsLocalTableDistributedTableJoin(List *rangeTableList)
1563 {
1564 bool containsLocalTable = false;
1565 bool containsDistributedTable = false;
1566
1567 RangeTblEntry *rangeTableEntry = NULL;
1568 foreach_ptr(rangeTableEntry, rangeTableList)
1569 {
1570 if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry),
1571 IsDistributedOrReferenceTableRTE))
1572 {
1573 containsDistributedTable = true;
1574 }
1575 else if (IsRecursivelyPlannableRelation(rangeTableEntry) &&
1576 IsLocalTableRteOrMatView((Node *) rangeTableEntry))
1577 {
1578 /* we consider citus local tables as local table */
1579 containsLocalTable = true;
1580 }
1581 }
1582
1583 return containsLocalTable && containsDistributedTable;
1584 }
1585
1586
1587 /*
1588 * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries
1589 * of a query and wraps the functions inside (SELECT * FROM fnc() f)
1590 * subqueries, so that those functions will be executed on the coordinator if
1591 * necessary.
1592 *
1593 * We wrap all the functions that are used in joins except the ones that are
1594 * laterally joined or have WITH ORDINALITY clauses.
1595 * */
1596 static void
WrapFunctionsInSubqueries(Query * query)1597 WrapFunctionsInSubqueries(Query *query)
1598 {
1599 List *rangeTableList = query->rtable;
1600 ListCell *rangeTableCell = NULL;
1601
1602 /*
1603 * If we have only one function call in a query without any joins, we can
1604 * easily decide where to execute it.
1605 *
1606 * If there are some subqueries and/or functions that are joined with a
1607 * function, it is not trivial to decide whether we should run this
1608 * function in the coordinator or in workers and therefore we may need to
1609 * wrap some of those functions in subqueries.
1610 *
1611 * If we have only one RTE, we leave the parsed query tree as it is. This
1612 * also makes sure we do not wrap an already wrapped function call
1613 * because we know that there will always be 1 RTE in a wrapped function.
1614 * */
1615 if (list_length(rangeTableList) < 2)
1616 {
1617 return;
1618 }
1619
1620 /* iterate over all RTEs and wrap them if necessary */
1621 foreach(rangeTableCell, rangeTableList)
1622 {
1623 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
1624
1625 if (ShouldTransformRTE(rangeTableEntry))
1626 {
1627 TransformFunctionRTE(rangeTableEntry);
1628 }
1629 }
1630 }
1631
1632
1633 /*
1634 * TransformFunctionRTE wraps a given function RangeTableEntry
1635 * inside a (SELECT * from function() f) subquery.
1636 *
1637 * The said RangeTableEntry is modified and now points to the new subquery.
1638 * */
1639 static void
TransformFunctionRTE(RangeTblEntry * rangeTblEntry)1640 TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
1641 {
1642 Query *subquery = makeNode(Query);
1643 RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
1644 Var *targetColumn = NULL;
1645 TargetEntry *targetEntry = NULL;
1646 AttrNumber targetColumnIndex = 0;
1647
1648 RangeTblFunction *rangeTblFunction = linitial(rangeTblEntry->functions);
1649
1650 subquery->commandType = CMD_SELECT;
1651
1652 /* copy the input rangeTblEntry to prevent cycles */
1653 RangeTblEntry *newRangeTableEntry = copyObject(rangeTblEntry);
1654
1655 /* set the FROM expression to the subquery */
1656 subquery->rtable = list_make1(newRangeTableEntry);
1657 newRangeTableRef->rtindex = 1;
1658 subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
1659
1660 /* Determine the result type of the function.
1661 *
1662 * If function return type is not composite or rowtype can't be determined,
1663 * tupleDesc is set to null here
1664 */
1665 TupleDesc tupleDesc = (TupleDesc) get_expr_result_tupdesc(rangeTblFunction->funcexpr,
1666 true);
1667
1668 /*
1669 * If tupleDesc is not null, we iterate over all the attributes and
1670 * create targetEntries
1671 * */
1672 if (tupleDesc)
1673 {
1674 /*
1675 * A sample function join that end up here:
1676 *
1677 * CREATE FUNCTION f(..) RETURNS TABLE(c1 int, c2 text) AS .. ;
1678 * SELECT .. FROM table JOIN f(..) ON ( .. ) ;
1679 *
1680 * We will iterate over Tuple Description attributes. i.e (c1 int, c2 text)
1681 */
1682 if (tupleDesc->natts > MaxAttrNumber)
1683 {
1684 ereport(ERROR, (errmsg("bad number of tuple descriptor attributes")));
1685 }
1686 AttrNumber natts = tupleDesc->natts;
1687 for (targetColumnIndex = 0; targetColumnIndex < natts;
1688 targetColumnIndex++)
1689 {
1690 FormData_pg_attribute *attribute = TupleDescAttr(tupleDesc,
1691 targetColumnIndex);
1692 Oid columnType = attribute->atttypid;
1693 char *columnName = attribute->attname.data;
1694
1695 /*
1696 * The indexing of attributes and TupleDesc and varattno differ
1697 *
1698 * varattno=0 corresponds to whole row
1699 * varattno=1 corresponds to first column that is stored in tupDesc->attrs[0]
1700 *
1701 * That's why we need to add one to the targetColumnIndex
1702 * */
1703 targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid,
1704 0);
1705 targetEntry = makeTargetEntry((Expr *) targetColumn, targetColumnIndex + 1,
1706 columnName, false);
1707 subquery->targetList = lappend(subquery->targetList, targetEntry);
1708 }
1709 }
1710
1711 /*
1712 * If tupleDesc is NULL we have 2 different cases:
1713 *
1714 * 1. The function returns a record but the attributes can not be
1715 * determined just by looking at the function definition. In this case the
1716 * column names and types must be defined explicitly in the query
1717 *
1718 * 2. The function returns a non-composite type (e.g. int, text, jsonb ..)
1719 * */
1720 else
1721 {
1722 /* create target entries for all columns returned by the function */
1723 ListCell *functionColumnName = NULL;
1724
1725 List *functionColumnNames = rangeTblEntry->eref->colnames;
1726 foreach(functionColumnName, functionColumnNames)
1727 {
1728 char *columnName = strVal(lfirst(functionColumnName));
1729 Oid columnType = InvalidOid;
1730
1731 /*
1732 * If the function returns a set of records, the query needs
1733 * to explicitly name column names and types
1734 *
1735 * Use explicitly defined types in the query if they are
1736 * available
1737 * */
1738 if (list_length(rangeTblFunction->funccoltypes) > 0)
1739 {
1740 /*
1741 * A sample function join that end up here:
1742 *
1743 * CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS
1744 * $cmd$
1745 * SELECT x, x+1 FROM generate_series(0,4) f(x)
1746 * $cmd$
1747 * LANGUAGE SQL;
1748 *
1749 * SELECT *
1750 * FROM table1 JOIN get_set_of_records() AS t2(x int, y int)
1751 * ON (id = x);
1752 *
1753 * Note that the function definition does not have column
1754 * names and types. Therefore the user needs to explicitly
1755 * state them in the query
1756 * */
1757 columnType = list_nth_oid(rangeTblFunction->funccoltypes,
1758 targetColumnIndex);
1759 }
1760
1761 /* use the types in the function definition otherwise */
1762 else
1763 {
1764 /*
1765 * Only functions returning simple types end up here.
1766 * A sample function:
1767 *
1768 * CREATE FUNCTION add(integer, integer) RETURNS integer AS
1769 * 'SELECT $1 + $2;'
1770 * LANGUAGE SQL;
1771 * SELECT * FROM table JOIN add(3,5) sum ON ( .. ) ;
1772 * */
1773 FuncExpr *funcExpr = (FuncExpr *) rangeTblFunction->funcexpr;
1774 columnType = funcExpr->funcresulttype;
1775 }
1776
1777 /* Note that the column k is associated with varattno/resno of k+1 */
1778 targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1,
1779 InvalidOid, 0);
1780 targetEntry = makeTargetEntry((Expr *) targetColumn,
1781 targetColumnIndex + 1, columnName, false);
1782 subquery->targetList = lappend(subquery->targetList, targetEntry);
1783
1784 targetColumnIndex++;
1785 }
1786 }
1787
1788 /* replace the function with the constructed subquery */
1789 rangeTblEntry->rtekind = RTE_SUBQUERY;
1790 rangeTblEntry->subquery = subquery;
1791 }
1792
1793
1794 /*
1795 * ShouldTransformRTE determines whether a given RTE should bne wrapped in a
1796 * subquery.
1797 *
1798 * Not all functions should be wrapped in a subquery for now. As we support more
1799 * functions to be used in joins, the constraints here will be relaxed.
1800 * */
1801 static bool
ShouldTransformRTE(RangeTblEntry * rangeTableEntry)1802 ShouldTransformRTE(RangeTblEntry *rangeTableEntry)
1803 {
1804 /*
1805 * We should wrap only function rtes that are not LATERAL and
1806 * without WITH ORDINALITY clause
1807 */
1808 if (rangeTableEntry->rtekind != RTE_FUNCTION ||
1809 rangeTableEntry->lateral ||
1810 rangeTableEntry->funcordinality)
1811 {
1812 return false;
1813 }
1814 return true;
1815 }
1816
1817
1818 /*
1819 * BuildSubPlanResultQuery returns a query of the form:
1820 *
1821 * SELECT
1822 * <target list>
1823 * FROM
1824 * read_intermediate_result('<resultId>', '<copy format'>)
1825 * AS res (<column definition list>);
1826 *
1827 * The caller can optionally supply a columnAliasList, which is useful for
1828 * CTEs that have column aliases.
1829 *
1830 * If any of the types in the target list cannot be used in the binary copy format,
1831 * then the copy format 'text' is used, otherwise 'binary' is used.
1832 */
1833 Query *
BuildSubPlanResultQuery(List * targetEntryList,List * columnAliasList,char * resultId)1834 BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId)
1835 {
1836 Oid functionOid = CitusReadIntermediateResultFuncId();
1837 bool useBinaryCopyFormat = CanUseBinaryCopyFormatForTargetList(targetEntryList);
1838
1839 Const *resultIdConst = makeNode(Const);
1840 resultIdConst->consttype = TEXTOID;
1841 resultIdConst->consttypmod = -1;
1842 resultIdConst->constlen = -1;
1843 resultIdConst->constvalue = CStringGetTextDatum(resultId);
1844 resultIdConst->constbyval = false;
1845 resultIdConst->constisnull = false;
1846 resultIdConst->location = -1;
1847
1848 return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList,
1849 resultIdConst, functionOid,
1850 useBinaryCopyFormat);
1851 }
1852
1853
1854 /*
1855 * BuildReadIntermediateResultsArrayQuery returns a query of the form:
1856 *
1857 * SELECT
1858 * <target list>
1859 * FROM
1860 * read_intermediate_results(ARRAY['<resultId>', ...]::text[], '<copy format'>)
1861 * AS res (<column definition list>);
1862 *
1863 * The caller can optionally supply a columnAliasList, which is useful for
1864 * CTEs that have column aliases.
1865 *
1866 * If useBinaryCopyFormat is true, then 'binary' format is used. Otherwise,
1867 * 'text' format is used.
1868 */
1869 Query *
BuildReadIntermediateResultsArrayQuery(List * targetEntryList,List * columnAliasList,List * resultIdList,bool useBinaryCopyFormat)1870 BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
1871 List *columnAliasList,
1872 List *resultIdList,
1873 bool useBinaryCopyFormat)
1874 {
1875 Oid functionOid = CitusReadIntermediateResultArrayFuncId();
1876
1877 Const *resultIdConst = makeNode(Const);
1878 resultIdConst->consttype = TEXTARRAYOID;
1879 resultIdConst->consttypmod = -1;
1880 resultIdConst->constlen = -1;
1881 resultIdConst->constvalue = PointerGetDatum(strlist_to_textarray(resultIdList));
1882 resultIdConst->constbyval = false;
1883 resultIdConst->constisnull = false;
1884 resultIdConst->location = -1;
1885
1886 return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList,
1887 resultIdConst, functionOid,
1888 useBinaryCopyFormat);
1889 }
1890
1891
1892 /*
1893 * BuildReadIntermediateResultsQuery is the common code for generating
1894 * queries to read from result files. It is used by
1895 * BuildReadIntermediateResultsArrayQuery and BuildSubPlanResultQuery.
1896 */
1897 static Query *
BuildReadIntermediateResultsQuery(List * targetEntryList,List * columnAliasList,Const * resultIdConst,Oid functionOid,bool useBinaryCopyFormat)1898 BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList,
1899 Const *resultIdConst, Oid functionOid,
1900 bool useBinaryCopyFormat)
1901 {
1902 List *funcColNames = NIL;
1903 List *funcColTypes = NIL;
1904 List *funcColTypMods = NIL;
1905 List *funcColCollations = NIL;
1906 ListCell *targetEntryCell = NULL;
1907 List *targetList = NIL;
1908 int columnNumber = 1;
1909 Oid copyFormatId = BinaryCopyFormatId();
1910 int columnAliasCount = list_length(columnAliasList);
1911
1912 /* build the target list and column definition list */
1913 foreach(targetEntryCell, targetEntryList)
1914 {
1915 TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
1916 Node *targetExpr = (Node *) targetEntry->expr;
1917 char *columnName = targetEntry->resname;
1918 Oid columnType = exprType(targetExpr);
1919 Oid columnTypMod = exprTypmod(targetExpr);
1920 Oid columnCollation = exprCollation(targetExpr);
1921
1922 if (targetEntry->resjunk)
1923 {
1924 continue;
1925 }
1926
1927 funcColNames = lappend(funcColNames, makeString(columnName));
1928 funcColTypes = lappend_int(funcColTypes, columnType);
1929 funcColTypMods = lappend_int(funcColTypMods, columnTypMod);
1930 funcColCollations = lappend_int(funcColCollations, columnCollation);
1931
1932 Var *functionColumnVar = makeNode(Var);
1933 functionColumnVar->varno = 1;
1934 functionColumnVar->varattno = columnNumber;
1935 functionColumnVar->vartype = columnType;
1936 functionColumnVar->vartypmod = columnTypMod;
1937 functionColumnVar->varcollid = columnCollation;
1938 functionColumnVar->varlevelsup = 0;
1939 functionColumnVar->varnosyn = 1;
1940 functionColumnVar->varattnosyn = columnNumber;
1941 functionColumnVar->location = -1;
1942
1943 TargetEntry *newTargetEntry = makeNode(TargetEntry);
1944 newTargetEntry->expr = (Expr *) functionColumnVar;
1945 newTargetEntry->resno = columnNumber;
1946
1947 /*
1948 * Rename the column only if a column alias is defined.
1949 * Notice that column alias count could be less than actual
1950 * column count. We only use provided aliases and keep the
1951 * original column names if no alias is defined.
1952 */
1953 if (columnAliasCount >= columnNumber)
1954 {
1955 Value *columnAlias = (Value *) list_nth(columnAliasList, columnNumber - 1);
1956 Assert(IsA(columnAlias, String));
1957 newTargetEntry->resname = strVal(columnAlias);
1958 }
1959 else
1960 {
1961 newTargetEntry->resname = columnName;
1962 }
1963 newTargetEntry->resjunk = false;
1964
1965 targetList = lappend(targetList, newTargetEntry);
1966
1967 columnNumber++;
1968 }
1969
1970 /* build the citus_copy_format parameter for the call to read_intermediate_result */
1971 if (!useBinaryCopyFormat)
1972 {
1973 copyFormatId = TextCopyFormatId();
1974 }
1975
1976 Const *resultFormatConst = makeNode(Const);
1977 resultFormatConst->consttype = CitusCopyFormatTypeId();
1978 resultFormatConst->consttypmod = -1;
1979 resultFormatConst->constlen = 4;
1980 resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId);
1981 resultFormatConst->constbyval = true;
1982 resultFormatConst->constisnull = false;
1983 resultFormatConst->location = -1;
1984
1985 /* build the call to read_intermediate_result */
1986 FuncExpr *funcExpr = makeNode(FuncExpr);
1987 funcExpr->funcid = functionOid;
1988 funcExpr->funcretset = true;
1989 funcExpr->funcvariadic = false;
1990 funcExpr->funcformat = 0;
1991 funcExpr->funccollid = 0;
1992 funcExpr->inputcollid = 0;
1993 funcExpr->location = -1;
1994 funcExpr->args = list_make2(resultIdConst, resultFormatConst);
1995
1996 /* build the RTE for the call to read_intermediate_result */
1997 RangeTblFunction *rangeTableFunction = makeNode(RangeTblFunction);
1998 rangeTableFunction->funccolcount = list_length(funcColNames);
1999 rangeTableFunction->funccolnames = funcColNames;
2000 rangeTableFunction->funccoltypes = funcColTypes;
2001 rangeTableFunction->funccoltypmods = funcColTypMods;
2002 rangeTableFunction->funccolcollations = funcColCollations;
2003 rangeTableFunction->funcparams = NULL;
2004 rangeTableFunction->funcexpr = (Node *) funcExpr;
2005
2006 Alias *funcAlias = makeNode(Alias);
2007 funcAlias->aliasname = "intermediate_result";
2008 funcAlias->colnames = funcColNames;
2009
2010 RangeTblEntry *rangeTableEntry = makeNode(RangeTblEntry);
2011 rangeTableEntry->rtekind = RTE_FUNCTION;
2012 rangeTableEntry->functions = list_make1(rangeTableFunction);
2013 rangeTableEntry->inFromCl = true;
2014 rangeTableEntry->eref = funcAlias;
2015
2016 /* build the join tree using the read_intermediate_result RTE */
2017 RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
2018 rangeTableRef->rtindex = 1;
2019
2020 FromExpr *joinTree = makeNode(FromExpr);
2021 joinTree->fromlist = list_make1(rangeTableRef);
2022
2023 /* build the SELECT query */
2024 Query *resultQuery = makeNode(Query);
2025 resultQuery->commandType = CMD_SELECT;
2026 resultQuery->rtable = list_make1(rangeTableEntry);
2027 resultQuery->jointree = joinTree;
2028 resultQuery->targetList = targetList;
2029
2030 return resultQuery;
2031 }
2032
2033
2034 /*
2035 * GenerateResultId generates the result ID that is used to identify an intermediate
2036 * result of the subplan with the given plan ID and subplan ID.
2037 */
2038 char *
GenerateResultId(uint64 planId,uint32 subPlanId)2039 GenerateResultId(uint64 planId, uint32 subPlanId)
2040 {
2041 StringInfo resultId = makeStringInfo();
2042
2043 appendStringInfo(resultId, UINT64_FORMAT "_%u", planId, subPlanId);
2044
2045 return resultId->data;
2046 }
2047
2048
2049 /*
2050 * GeneratingSubplans returns true if we are currently in the process of
2051 * generating subplans.
2052 */
2053 bool
GeneratingSubplans(void)2054 GeneratingSubplans(void)
2055 {
2056 return recursivePlanningDepth > 0;
2057 }
2058