1 /*-------------------------------------------------------------------------
2 *
3 * query_pushdown_planning.c
4 *
5 * Routines for creating pushdown plans for queries. Both select and modify
6 * queries can be planned using query pushdown logic passing the checks given
7 * in this file.
8 *
9 * Checks are controlled to understand whether the query can be sent to worker
10 * nodes by simply adding shard_id to table names and getting the correct result
11 * from them. That means, all the required data is present on the workers.
12 *
13 * For select queries, Citus try to use query pushdown planner if it has a
14 * subquery or function RTEs. For modify queries, Citus try to use query pushdown
15 * planner if the query accesses multiple tables.
16 *
17 * Copyright (c) Citus Data, Inc.
18 *
19 *-------------------------------------------------------------------------
20 */
21
22 #include "postgres.h"
23
24 #include "distributed/pg_version_constants.h"
25
26 #include "distributed/citus_clauses.h"
27 #include "distributed/citus_ruleutils.h"
28 #include "distributed/deparse_shard_query.h"
29 #include "distributed/listutils.h"
30 #include "distributed/metadata_cache.h"
31 #include "distributed/multi_logical_optimizer.h"
32 #include "distributed/multi_logical_planner.h"
33 #include "distributed/multi_router_planner.h"
34 #include "distributed/pg_dist_partition.h"
35 #include "distributed/query_utils.h"
36 #include "distributed/query_pushdown_planning.h"
37 #include "distributed/recursive_planning.h"
38 #include "distributed/relation_restriction_equivalence.h"
39 #include "distributed/version_compat.h"
40 #include "nodes/nodeFuncs.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/optimizer.h"
43 #include "nodes/pg_list.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parsetree.h"
46
47
48 /*
49 * RecurringTuplesType is used to distinguish different types of expressions
50 * that always produce the same set of tuples when a shard is queried. We make
51 * this distinction to produce relevant error messages when recurring tuples
52 * are used in a way that would give incorrect results.
53 */
54 typedef enum RecurringTuplesType
55 {
56 RECURRING_TUPLES_INVALID = 0,
57 RECURRING_TUPLES_REFERENCE_TABLE,
58 RECURRING_TUPLES_FUNCTION,
59 RECURRING_TUPLES_EMPTY_JOIN_TREE,
60 RECURRING_TUPLES_RESULT_FUNCTION,
61 RECURRING_TUPLES_VALUES
62 } RecurringTuplesType;
63
64
65 /* Config variable managed via guc.c */
66 bool SubqueryPushdown = false; /* is subquery pushdown enabled */
67 int ValuesMaterializationThreshold = 100;
68
69 /* Local functions forward declarations */
70 static bool JoinTreeContainsSubqueryWalker(Node *joinTreeNode, void *context);
71 static bool IsFunctionOrValuesRTE(Node *node);
72 static bool IsOuterJoinExpr(Node *node);
73 static bool WindowPartitionOnDistributionColumn(Query *query);
74 static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree);
75 static RecurringTuplesType FromClauseRecurringTupleType(Query *queryTree);
76 static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin(
77 PlannerRestrictionContext *plannerRestrictionContext);
78 static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree);
79 static DeferredErrorMessage * DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree);
80 static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList);
81 static RecurringTuplesType FetchFirstRecurType(PlannerInfo *plannerInfo,
82 Relids relids);
83 static bool ContainsRecurringRTE(RangeTblEntry *rangeTableEntry,
84 RecurringTuplesType *recurType);
85 static bool ContainsRecurringRangeTable(List *rangeTable, RecurringTuplesType *recurType);
86 static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
87 static MultiNode * SubqueryPushdownMultiNodeTree(Query *queryTree);
88 static MultiTable * MultiSubqueryPushdownTable(Query *subquery);
89 static List * CreateSubqueryTargetListAndAdjustVars(List *columnList);
90 static AttrNumber FindResnoForVarInTargetList(List *targetList, int varno, int varattno);
91 static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo,
92 Relids relids);
93 static Var * PartitionColumnForPushedDownSubquery(Query *query);
94
95
96 /*
97 * ShouldUseSubqueryPushDown determines whether it's desirable to use
98 * subquery pushdown to plan the query based on the original and
99 * rewritten query.
100 */
101 bool
ShouldUseSubqueryPushDown(Query * originalQuery,Query * rewrittenQuery,PlannerRestrictionContext * plannerRestrictionContext)102 ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
103 PlannerRestrictionContext *plannerRestrictionContext)
104 {
105 /*
106 * We check the existence of subqueries in FROM clause on the modified query
107 * given that if postgres already flattened the subqueries, MultiNodeTree()
108 * can plan corresponding distributed plan.
109 */
110 if (JoinTreeContainsSubquery(rewrittenQuery))
111 {
112 return true;
113 }
114
115 /*
116 * We check the existence of subqueries in WHERE and HAVING clause on the
117 * modified query. In some cases subqueries in the original query are
118 * converted into inner joins and in those cases MultiNodeTree() can plan
119 * the rewritten plan.
120 */
121 if (WhereOrHavingClauseContainsSubquery(rewrittenQuery))
122 {
123 return true;
124 }
125
126 /*
127 * We check the existence of subqueries in the SELECT clause on the modified
128 * query.
129 */
130 if (TargetListContainsSubquery(rewrittenQuery->targetList))
131 {
132 return true;
133 }
134
135
136 /*
137 * We check if postgres planned any semi joins, MultiNodeTree doesn't
138 * support these so we fail. Postgres is able to replace some IN/ANY
139 * subqueries with semi joins and then replace those with inner joins (ones
140 * where the subquery returns unique results). This allows MultiNodeTree to
141 * execute these subqueries (because they are converted to inner joins).
142 * However, even in that case the rewrittenQuery still contains join nodes
143 * with jointype JOIN_SEMI because Postgres doesn't actually update these.
144 * The way we find out instead if it actually planned semi joins, is by
145 * checking the joins that were sent to multi_join_restriction_hook. If no
146 * joins of type JOIN_SEMI are sent it is safe to convert all JOIN_SEMI
147 * nodes to JOIN_INNER nodes (which is what is done in MultiNodeTree).
148 */
149 JoinRestrictionContext *joinRestrictionContext =
150 plannerRestrictionContext->joinRestrictionContext;
151 if (joinRestrictionContext->hasSemiJoin)
152 {
153 return true;
154 }
155
156
157 /*
158 * We process function and VALUES RTEs as subqueries, since the join order planner
159 * does not know how to handle them.
160 */
161 if (FindNodeMatchingCheckFunction((Node *) originalQuery, IsFunctionOrValuesRTE))
162 {
163 return true;
164 }
165
166 /*
167 * We handle outer joins as subqueries, since the join order planner
168 * does not know how to handle them.
169 */
170 if (FindNodeMatchingCheckFunction((Node *) originalQuery->jointree, IsOuterJoinExpr))
171 {
172 return true;
173 }
174
175 /*
176 * Original query may not have an outer join while rewritten query does.
177 * We should push down in this case.
178 * An example of this is https://github.com/citusdata/citus/issues/2739
179 * where postgres pulls-up the outer-join in the subquery.
180 */
181 if (FindNodeMatchingCheckFunction((Node *) rewrittenQuery->jointree, IsOuterJoinExpr))
182 {
183 return true;
184 }
185
186 /*
187 * Some unsupported join clauses in logical planner
188 * may be supported by subquery pushdown planner.
189 */
190 List *qualifierList = QualifierList(rewrittenQuery->jointree);
191 if (DeferErrorIfUnsupportedClause(qualifierList) != NULL)
192 {
193 return true;
194 }
195
196 /* check if the query has a window function and it is safe to pushdown */
197 if (originalQuery->hasWindowFuncs &&
198 SafeToPushdownWindowFunction(originalQuery, NULL))
199 {
200 return true;
201 }
202
203 return false;
204 }
205
206
207 /*
208 * JoinTreeContainsSubquery returns true if the input query contains any subqueries
209 * in the join tree (e.g., FROM clause).
210 */
211 bool
JoinTreeContainsSubquery(Query * query)212 JoinTreeContainsSubquery(Query *query)
213 {
214 FromExpr *joinTree = query->jointree;
215
216 if (!joinTree)
217 {
218 return false;
219 }
220
221 return JoinTreeContainsSubqueryWalker((Node *) joinTree, query);
222 }
223
224
225 /*
226 * HasEmptyJoinTree returns whether the query selects from anything.
227 */
228 bool
HasEmptyJoinTree(Query * query)229 HasEmptyJoinTree(Query *query)
230 {
231 if (query->rtable == NIL)
232 {
233 return true;
234 }
235 else if (list_length(query->rtable) == 1)
236 {
237 RangeTblEntry *rte = (RangeTblEntry *) linitial(query->rtable);
238 if (rte->rtekind == RTE_RESULT)
239 {
240 return true;
241 }
242 }
243
244 return false;
245 }
246
247
248 /*
249 * JoinTreeContainsSubqueryWalker returns true if the input joinTreeNode
250 * references to a subquery. Otherwise, recurses into the expression.
251 */
252 static bool
JoinTreeContainsSubqueryWalker(Node * joinTreeNode,void * context)253 JoinTreeContainsSubqueryWalker(Node *joinTreeNode, void *context)
254 {
255 if (joinTreeNode == NULL)
256 {
257 return false;
258 }
259
260 if (IsA(joinTreeNode, RangeTblRef))
261 {
262 Query *query = (Query *) context;
263
264 RangeTblRef *rangeTableRef = (RangeTblRef *) joinTreeNode;
265 RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableRef->rtindex, query->rtable);
266
267 if (rangeTableEntry->rtekind == RTE_SUBQUERY)
268 {
269 return true;
270 }
271
272 return false;
273 }
274
275 return expression_tree_walker(joinTreeNode, JoinTreeContainsSubqueryWalker, context);
276 }
277
278
279 /*
280 * WhereOrHavingClauseContainsSubquery returns true if the input query contains
281 * any subqueries in the WHERE or HAVING clause.
282 */
283 bool
WhereOrHavingClauseContainsSubquery(Query * query)284 WhereOrHavingClauseContainsSubquery(Query *query)
285 {
286 if (FindNodeMatchingCheckFunction(query->havingQual, IsNodeSubquery))
287 {
288 return true;
289 }
290
291 if (!query->jointree)
292 {
293 return false;
294 }
295
296 /*
297 * We search the whole jointree here, not just the quals. The reason for
298 * this is that the fromlist can contain other FromExpr nodes again or
299 * JoinExpr nodes that also have quals. If that's the case we need to check
300 * those as well if they contain andy subqueries.
301 */
302 return FindNodeMatchingCheckFunction((Node *) query->jointree, IsNodeSubquery);
303 }
304
305
306 /*
307 * TargetList returns true if the input query contains
308 * any subqueries in the WHERE clause.
309 */
310 bool
TargetListContainsSubquery(List * targetList)311 TargetListContainsSubquery(List *targetList)
312 {
313 return FindNodeMatchingCheckFunction((Node *) targetList, IsNodeSubquery);
314 }
315
316
317 /*
318 * IsFunctionRTE determines whether the given node is a function RTE.
319 */
320 static bool
IsFunctionOrValuesRTE(Node * node)321 IsFunctionOrValuesRTE(Node *node)
322 {
323 if (IsA(node, RangeTblEntry))
324 {
325 RangeTblEntry *rangeTblEntry = (RangeTblEntry *) node;
326
327 if (rangeTblEntry->rtekind == RTE_FUNCTION ||
328 rangeTblEntry->rtekind == RTE_VALUES)
329 {
330 return true;
331 }
332 }
333
334 return false;
335 }
336
337
338 /*
339 * IsNodeSubquery returns true if the given node is a Query or SubPlan or a
340 * Param node with paramkind PARAM_EXEC.
341 *
342 * The check for SubPlan is needed when this is used on a already rewritten
343 * query. Such a query has SubPlan nodes instead of SubLink nodes (which
344 * contain a Query node).
345 * The check for PARAM_EXEC is needed because some very simple subqueries like
346 * (select 1) are converted to init plans in the rewritten query. In this case
347 * the only thing left in the query tree is a Param node with type PARAM_EXEC.
348 */
349 bool
IsNodeSubquery(Node * node)350 IsNodeSubquery(Node *node)
351 {
352 if (node == NULL)
353 {
354 return false;
355 }
356
357 if (IsA(node, Query) || IsA(node, SubPlan))
358 {
359 return true;
360 }
361
362 if (!IsA(node, Param))
363 {
364 return false;
365 }
366 return ((Param *) node)->paramkind == PARAM_EXEC;
367 }
368
369
370 /*
371 * IsOuterJoinExpr returns whether the given node is an outer join expression.
372 */
373 static bool
IsOuterJoinExpr(Node * node)374 IsOuterJoinExpr(Node *node)
375 {
376 bool isOuterJoin = false;
377
378 if (node == NULL)
379 {
380 return false;
381 }
382
383 if (IsA(node, JoinExpr))
384 {
385 JoinExpr *joinExpr = (JoinExpr *) node;
386 JoinType joinType = joinExpr->jointype;
387 if (IS_OUTER_JOIN(joinType))
388 {
389 isOuterJoin = true;
390 }
391 }
392
393 return isOuterJoin;
394 }
395
396
397 /*
398 * SafeToPushdownWindowFunction checks if the query with window function is supported.
399 * Returns the result accordingly and modifies errorDetail if non null.
400 */
401 bool
SafeToPushdownWindowFunction(Query * query,StringInfo * errorDetail)402 SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail)
403 {
404 ListCell *windowClauseCell = NULL;
405 List *windowClauseList = query->windowClause;
406
407 /*
408 * We need to check each window clause separately if there is a partition by clause
409 * and if it is partitioned on the distribution column.
410 */
411 foreach(windowClauseCell, windowClauseList)
412 {
413 WindowClause *windowClause = lfirst(windowClauseCell);
414
415 if (!windowClause->partitionClause)
416 {
417 if (errorDetail)
418 {
419 *errorDetail = makeStringInfo();
420 appendStringInfoString(*errorDetail,
421 "Window functions without PARTITION BY on distribution "
422 "column is currently unsupported");
423 }
424 return false;
425 }
426 }
427
428 if (!WindowPartitionOnDistributionColumn(query))
429 {
430 if (errorDetail)
431 {
432 *errorDetail = makeStringInfo();
433 appendStringInfoString(*errorDetail,
434 "Window functions with PARTITION BY list missing distribution "
435 "column is currently unsupported");
436 }
437 return false;
438 }
439
440 return true;
441 }
442
443
444 /*
445 * WindowPartitionOnDistributionColumn checks if the given subquery has one
446 * or more window functions and at least one of them is not partitioned by
447 * distribution column. The function returns false if your window function does not
448 * have a partition by clause or it does not include the distribution column.
449 *
450 * Please note that if the query does not have a window function, the function
451 * returns true.
452 */
453 static bool
WindowPartitionOnDistributionColumn(Query * query)454 WindowPartitionOnDistributionColumn(Query *query)
455 {
456 List *windowClauseList = query->windowClause;
457 ListCell *windowClauseCell = NULL;
458
459 foreach(windowClauseCell, windowClauseList)
460 {
461 WindowClause *windowClause = lfirst(windowClauseCell);
462 List *partitionClauseList = windowClause->partitionClause;
463 List *targetEntryList = query->targetList;
464
465 List *groupTargetEntryList =
466 GroupTargetEntryList(partitionClauseList, targetEntryList);
467
468 bool partitionOnDistributionColumn =
469 TargetListOnPartitionColumn(query, groupTargetEntryList);
470
471 if (!partitionOnDistributionColumn)
472 {
473 return false;
474 }
475 }
476
477 return true;
478 }
479
480
481 /*
482 * SubqueryMultiNodeTree gets the query objects and returns logical plan
483 * for subqueries.
484 *
485 * We currently have two different code paths for creating logic plan for subqueries:
486 * (i) subquery pushdown
487 * (ii) single relation repartition subquery
488 *
489 * In order to create the logical plan, we follow the algorithm below:
490 * - If subquery pushdown planner can plan the query
491 * - We're done, we create the multi plan tree and return
492 * - Else
493 * - If the query is not eligible for single table repartition subquery planning
494 * - Throw the error that the subquery pushdown planner generated
495 * - If it is eligible for single table repartition subquery planning
496 * - Check for the errors for single table repartition subquery planning
497 * - If no errors found, we're done. Create the multi plan and return
498 * - If found errors, throw it
499 */
500 MultiNode *
SubqueryMultiNodeTree(Query * originalQuery,Query * queryTree,PlannerRestrictionContext * plannerRestrictionContext)501 SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree,
502 PlannerRestrictionContext *plannerRestrictionContext)
503 {
504 MultiNode *multiQueryNode = NULL;
505
506 /*
507 * This is a generic error check that applies to both subquery pushdown
508 * and single table repartition subquery.
509 */
510 DeferredErrorMessage *unsupportedQueryError = DeferErrorIfQueryNotSupported(
511 originalQuery);
512 if (unsupportedQueryError != NULL)
513 {
514 RaiseDeferredError(unsupportedQueryError, ERROR);
515 }
516
517 /*
518 * In principle, we're first trying subquery pushdown planner. If it fails
519 * to create a logical plan, continue with trying the single table
520 * repartition subquery planning.
521 */
522 DeferredErrorMessage *subqueryPushdownError = DeferErrorIfUnsupportedSubqueryPushdown(
523 originalQuery,
524 plannerRestrictionContext);
525 if (!subqueryPushdownError)
526 {
527 multiQueryNode = SubqueryPushdownMultiNodeTree(originalQuery);
528 }
529 else if (subqueryPushdownError)
530 {
531 RaiseDeferredErrorInternal(subqueryPushdownError, ERROR);
532
533 List *subqueryEntryList = SubqueryEntryList(queryTree);
534 RangeTblEntry *subqueryRangeTableEntry = (RangeTblEntry *) linitial(
535 subqueryEntryList);
536 Assert(subqueryRangeTableEntry->rtekind == RTE_SUBQUERY);
537
538 Query *subqueryTree = subqueryRangeTableEntry->subquery;
539
540 DeferredErrorMessage *repartitionQueryError =
541 DeferErrorIfUnsupportedSubqueryRepartition(subqueryTree);
542 if (repartitionQueryError)
543 {
544 RaiseDeferredErrorInternal(repartitionQueryError, ERROR);
545 }
546
547 /* all checks have passed, safe to create the multi plan */
548 multiQueryNode = MultiNodeTree(queryTree);
549 }
550
551 Assert(multiQueryNode != NULL);
552
553 return multiQueryNode;
554 }
555
556
557 /*
558 * DeferErrorIfContainsUnsupportedSubqueryPushdown iterates on the query's subquery
559 * entry list and uses helper functions to check if we can push down subquery
560 * to worker nodes. These helper functions returns a deferred error if we
561 * cannot push down the subquery.
562 */
563 DeferredErrorMessage *
DeferErrorIfUnsupportedSubqueryPushdown(Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext)564 DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
565 PlannerRestrictionContext *
566 plannerRestrictionContext)
567 {
568 bool outerMostQueryHasLimit = false;
569 ListCell *subqueryCell = NULL;
570 List *subqueryList = NIL;
571
572 if (originalQuery->limitCount != NULL)
573 {
574 outerMostQueryHasLimit = true;
575 }
576
577 /*
578 * We're checking two things here:
579 * (i) If the query contains a top level union, ensure that all leaves
580 * return the partition key at the same position
581 * (ii) Else, check whether all relations joined on the partition key or not
582 */
583 if (ContainsUnionSubquery(originalQuery))
584 {
585 if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext))
586 {
587 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
588 "cannot pushdown the subquery since not all subqueries "
589 "in the UNION have the partition column in the same "
590 "position",
591 "Each leaf query of the UNION should return the "
592 "partition column in the same position and all joins "
593 "must be on the partition column",
594 NULL);
595 }
596 }
597 else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
598 {
599 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
600 "complex joins are only supported when all distributed tables are "
601 "co-located and joined on their distribution columns",
602 NULL, NULL);
603 }
604
605 /* we shouldn't allow reference tables in the FROM clause when the query has sublinks */
606 DeferredErrorMessage *error = DeferErrorIfFromClauseRecurs(originalQuery);
607 if (error)
608 {
609 return error;
610 }
611
612 /* we shouldn't allow reference tables in the outer part of outer joins */
613 error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext);
614 if (error)
615 {
616 return error;
617 }
618
619 /*
620 * We first extract all the queries that appear in the original query. Later,
621 * we delete the original query given that error rules does not apply to the
622 * top level query. For instance, we could support any LIMIT/ORDER BY on the
623 * top level query.
624 */
625 ExtractQueryWalker((Node *) originalQuery, &subqueryList);
626 subqueryList = list_delete(subqueryList, originalQuery);
627
628 /* iterate on the subquery list and error out accordingly */
629 foreach(subqueryCell, subqueryList)
630 {
631 Query *subquery = lfirst(subqueryCell);
632 error = DeferErrorIfCannotPushdownSubquery(subquery,
633 outerMostQueryHasLimit);
634 if (error)
635 {
636 return error;
637 }
638 }
639
640 return NULL;
641 }
642
643
644 /*
645 * DeferErrorIfFromClauseRecurs returns a deferred error if the
646 * given query is not suitable for subquery pushdown.
647 *
648 * While planning sublinks, we rely on Postgres in the sense that it converts some of
649 * sublinks into joins.
650 *
651 * In some cases, sublinks are pulled up and converted into outer joins. Those cases
652 * are already handled with DeferredErrorIfUnsupportedRecurringTuplesJoin().
653 *
654 * If the sublinks are not pulled up, we should still error out in if the expression
655 * in the FROM clause would recur for every shard in a subquery on the WHERE clause.
656 *
657 * Otherwise, the result would include duplicate rows.
658 */
659 static DeferredErrorMessage *
DeferErrorIfFromClauseRecurs(Query * queryTree)660 DeferErrorIfFromClauseRecurs(Query *queryTree)
661 {
662 if (!queryTree->hasSubLinks)
663 {
664 return NULL;
665 }
666
667 RecurringTuplesType recurType = FromClauseRecurringTupleType(queryTree);
668 if (recurType == RECURRING_TUPLES_REFERENCE_TABLE)
669 {
670 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
671 "correlated subqueries are not supported when "
672 "the FROM clause contains a reference table", NULL, NULL);
673 }
674 else if (recurType == RECURRING_TUPLES_FUNCTION)
675 {
676 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
677 "correlated subqueries are not supported when "
678 "the FROM clause contains a set returning function", NULL,
679 NULL);
680 }
681 else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
682 {
683 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
684 "correlated subqueries are not supported when "
685 "the FROM clause contains a CTE or subquery", NULL, NULL);
686 }
687 else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE)
688 {
689 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
690 "correlated subqueries are not supported when "
691 "the FROM clause contains a subquery without FROM", NULL,
692 NULL);
693 }
694 else if (recurType == RECURRING_TUPLES_VALUES)
695 {
696 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
697 "correlated subqueries are not supported when "
698 "the FROM clause contains VALUES", NULL,
699 NULL);
700 }
701
702
703 /*
704 * We get here when there is neither a distributed table, nor recurring tuples.
705 * That usually means that there isn't a FROM at all (only sublinks), this
706 * implies that queryTree is recurring, but whether this is a problem depends
707 * on outer queries, not on queryTree itself.
708 */
709
710 return NULL;
711 }
712
713
714 /*
715 * FromClauseRecurringTupleType returns tuple recurrence information
716 * in query result based on range table entries in from clause.
717 *
718 * Returned information is used to prepare appropriate deferred error
719 * message for subquery pushdown checks.
720 */
721 static RecurringTuplesType
FromClauseRecurringTupleType(Query * queryTree)722 FromClauseRecurringTupleType(Query *queryTree)
723 {
724 RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
725
726 if (HasEmptyJoinTree(queryTree))
727 {
728 return RECURRING_TUPLES_EMPTY_JOIN_TREE;
729 }
730
731 if (FindNodeMatchingCheckFunctionInRangeTableList(queryTree->rtable,
732 IsDistributedTableRTE))
733 {
734 /*
735 * There is a distributed table somewhere in the FROM clause.
736 *
737 * In the typical case this means that the query does not recur,
738 * but there are two exceptions:
739 *
740 * - outer joins such as reference_table LEFT JOIN distributed_table
741 * - FROM reference_table WHERE .. (SELECT .. FROM distributed_table) ..
742 *
743 * However, we check all subqueries and joins separately, so we would
744 * find such conditions in other calls.
745 */
746 return RECURRING_TUPLES_INVALID;
747 }
748
749 /*
750 * Try to figure out which type of recurring tuples we have to produce a
751 * relevant error message. If there are several we'll pick the first one.
752 */
753 ContainsRecurringRangeTable(queryTree->rtable, &recurType);
754
755 return recurType;
756 }
757
758
759 /*
760 * DeferredErrorIfUnsupportedRecurringTuplesJoin returns true if there exists a outer join
761 * between reference table and distributed tables which does not follow
762 * the rules :
763 * - Reference tables can not be located in the outer part of the semi join or the
764 * anti join. Otherwise, we may have duplicate results. Although getting duplicate
765 * results is not possible by checking the equality on the column of the reference
766 * table and partition column of distributed table, we still keep these checks.
767 * Because, using the reference table in the outer part of the semi join or anti
768 * join is not very common.
769 * - Reference tables can not be located in the outer part of the left join
770 * (Note that PostgreSQL converts right joins to left joins. While converting
771 * join types, innerrel and outerrel are also switched.) Otherwise we will
772 * definitely have duplicate rows. Beside, reference tables can not be used
773 * with full outer joins because of the same reason.
774 */
775 static DeferredErrorMessage *
DeferredErrorIfUnsupportedRecurringTuplesJoin(PlannerRestrictionContext * plannerRestrictionContext)776 DeferredErrorIfUnsupportedRecurringTuplesJoin(
777 PlannerRestrictionContext *plannerRestrictionContext)
778 {
779 List *joinRestrictionList =
780 plannerRestrictionContext->joinRestrictionContext->joinRestrictionList;
781 ListCell *joinRestrictionCell = NULL;
782 RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
783 foreach(joinRestrictionCell, joinRestrictionList)
784 {
785 JoinRestriction *joinRestriction = (JoinRestriction *) lfirst(
786 joinRestrictionCell);
787 JoinType joinType = joinRestriction->joinType;
788 PlannerInfo *plannerInfo = joinRestriction->plannerInfo;
789 Relids innerrelRelids = joinRestriction->innerrelRelids;
790 Relids outerrelRelids = joinRestriction->outerrelRelids;
791
792 if (joinType == JOIN_SEMI || joinType == JOIN_ANTI || joinType == JOIN_LEFT)
793 {
794 /*
795 * If there are only recurring tuples on the inner side of a join then
796 * we can push it down, regardless of whether the outer side is
797 * recurring or not. Otherwise, we check the outer side for recurring
798 * tuples.
799 */
800 if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids))
801 {
802 continue;
803 }
804
805
806 /*
807 * If the outer side of the join doesn't have any distributed tables
808 * (e.g., contains only recurring tuples), Citus should not pushdown
809 * the query. The reason is that recurring tuples on every shard would
810 * be added to the result, which is wrong.
811 */
812 if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids))
813 {
814 /*
815 * Find the first (or only) recurring RTE to give a meaningful
816 * error to the user.
817 */
818 recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
819
820 break;
821 }
822 }
823 else if (joinType == JOIN_FULL)
824 {
825 if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids))
826 {
827 /*
828 * Find the first (or only) recurring RTE to give a meaningful
829 * error to the user.
830 */
831 recurType = FetchFirstRecurType(plannerInfo, innerrelRelids);
832
833 break;
834 }
835
836 if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids))
837 {
838 /*
839 * Find the first (or only) recurring RTE to give a meaningful
840 * error to the user.
841 */
842 recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
843
844 break;
845 }
846 }
847 }
848
849 if (recurType == RECURRING_TUPLES_REFERENCE_TABLE)
850 {
851 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
852 "cannot pushdown the subquery",
853 "There exist a reference table in the outer "
854 "part of the outer join", NULL);
855 }
856 else if (recurType == RECURRING_TUPLES_FUNCTION)
857 {
858 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
859 "cannot pushdown the subquery",
860 "There exist a table function in the outer "
861 "part of the outer join", NULL);
862 }
863 else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE)
864 {
865 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
866 "cannot pushdown the subquery",
867 "There exist a subquery without FROM in the outer "
868 "part of the outer join", NULL);
869 }
870 else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
871 {
872 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
873 "cannot pushdown the subquery",
874 "Complex subqueries, CTEs and local tables cannot be in "
875 "the outer part of an outer join with a distributed table",
876 NULL);
877 }
878 else if (recurType == RECURRING_TUPLES_VALUES)
879 {
880 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
881 "cannot pushdown the subquery",
882 "There exist a VALUES clause in the outer "
883 "part of the outer join", NULL);
884 }
885
886 return NULL;
887 }
888
889
890 /*
891 * CanPushdownSubquery checks if we can push down the given
892 * subquery to worker nodes.
893 */
894 bool
CanPushdownSubquery(Query * subqueryTree,bool outerMostQueryHasLimit)895 CanPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit)
896 {
897 return DeferErrorIfCannotPushdownSubquery(subqueryTree, outerMostQueryHasLimit) ==
898 NULL;
899 }
900
901
902 /*
903 * DeferErrorIfCannotPushdownSubquery checks if we can push down the given
904 * subquery to worker nodes. If we cannot push down the subquery, this function
905 * returns a deferred error.
906 *
907 * We can push down a subquery if it follows rules below:
908 * a. If there is an aggregate, it must be grouped on partition column.
909 * b. If there is a join, it must be between two regular tables or two subqueries.
910 * We don't support join between a regular table and a subquery. And columns on
911 * the join condition must be partition columns.
912 * c. If there is a distinct clause, it must be on the partition column.
913 *
914 * This function is very similar to DeferErrorIfQueryNotSupported() in logical
915 * planner, but we don't reuse it, because differently for subqueries we support
916 * a subset of distinct, union and left joins.
917 *
918 * Note that this list of checks is not exhaustive, there can be some cases
919 * which we let subquery to run but returned results would be wrong. Such as if
920 * a subquery has a group by on another subquery which includes order by with
921 * limit, we let this query to run, but results could be wrong depending on the
922 * features of underlying tables.
923 */
924 DeferredErrorMessage *
DeferErrorIfCannotPushdownSubquery(Query * subqueryTree,bool outerMostQueryHasLimit)925 DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit)
926 {
927 bool preconditionsSatisfied = true;
928 char *errorDetail = NULL;
929
930 DeferredErrorMessage *deferredError = DeferErrorIfUnsupportedTableCombination(
931 subqueryTree);
932 if (deferredError)
933 {
934 return deferredError;
935 }
936
937 if (HasEmptyJoinTree(subqueryTree) &&
938 contain_mutable_functions((Node *) subqueryTree->targetList))
939 {
940 preconditionsSatisfied = false;
941 errorDetail = "Subqueries without a FROM clause can only contain immutable "
942 "functions";
943 }
944
945 /*
946 * Correlated subqueries are effectively functions that are repeatedly called
947 * for the values of the vars that point to the outer query. We can liberally
948 * push down SQL features within such a function, as long as co-located join
949 * checks are applied.
950 */
951 if (!ContainsReferencesToOuterQuery(subqueryTree))
952 {
953 deferredError = DeferErrorIfSubqueryRequiresMerge(subqueryTree);
954 if (deferredError)
955 {
956 return deferredError;
957 }
958 }
959
960 /*
961 * Limit is partially supported when SubqueryPushdown is set.
962 * The outermost query must have a limit clause.
963 */
964 if (subqueryTree->limitCount && SubqueryPushdown && !outerMostQueryHasLimit)
965 {
966 preconditionsSatisfied = false;
967 errorDetail = "Limit in subquery without limit in the outermost query is "
968 "unsupported";
969 }
970
971 if (subqueryTree->setOperations)
972 {
973 deferredError = DeferErrorIfUnsupportedUnionQuery(subqueryTree);
974 if (deferredError)
975 {
976 return deferredError;
977 }
978 }
979
980 if (subqueryTree->hasRecursive)
981 {
982 preconditionsSatisfied = false;
983 errorDetail = "Recursive queries are currently unsupported";
984 }
985
986 if (subqueryTree->cteList)
987 {
988 preconditionsSatisfied = false;
989 errorDetail = "Common Table Expressions are currently unsupported";
990 }
991
992 if (subqueryTree->hasForUpdate)
993 {
994 preconditionsSatisfied = false;
995 errorDetail = "For Update/Share commands are currently unsupported";
996 }
997
998 /* grouping sets are not allowed in subqueries*/
999 if (subqueryTree->groupingSets)
1000 {
1001 preconditionsSatisfied = false;
1002 errorDetail = "could not run distributed query with GROUPING SETS, CUBE, "
1003 "or ROLLUP";
1004 }
1005
1006 deferredError = DeferErrorIfFromClauseRecurs(subqueryTree);
1007 if (deferredError)
1008 {
1009 return deferredError;
1010 }
1011
1012
1013 /* finally check and return deferred if not satisfied */
1014 if (!preconditionsSatisfied)
1015 {
1016 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1017 "cannot push down this subquery",
1018 errorDetail, NULL);
1019 }
1020
1021 return NULL;
1022 }
1023
1024
1025 /*
1026 * DeferErrorIfSubqueryRequiresMerge returns a deferred error if the subquery
1027 * requires a merge step on the coordinator (e.g. limit, group by non-distribution
1028 * column, etc.).
1029 */
1030 static DeferredErrorMessage *
DeferErrorIfSubqueryRequiresMerge(Query * subqueryTree)1031 DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree)
1032 {
1033 bool preconditionsSatisfied = true;
1034 char *errorDetail = NULL;
1035
1036 if (subqueryTree->limitOffset)
1037 {
1038 preconditionsSatisfied = false;
1039 errorDetail = "Offset clause is currently unsupported when a subquery "
1040 "references a column from another query";
1041 }
1042
1043 /* limit is not supported when SubqueryPushdown is not set */
1044 if (subqueryTree->limitCount && !SubqueryPushdown)
1045 {
1046 preconditionsSatisfied = false;
1047 errorDetail = "Limit in subquery is currently unsupported when a "
1048 "subquery references a column from another query";
1049 }
1050
1051 /* group clause list must include partition column */
1052 if (subqueryTree->groupClause)
1053 {
1054 List *groupClauseList = subqueryTree->groupClause;
1055 List *targetEntryList = subqueryTree->targetList;
1056 List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
1057 targetEntryList);
1058 bool groupOnPartitionColumn =
1059 TargetListOnPartitionColumn(subqueryTree, groupTargetEntryList);
1060 if (!groupOnPartitionColumn)
1061 {
1062 preconditionsSatisfied = false;
1063 errorDetail = "Group by list without partition column is currently "
1064 "unsupported when a subquery references a column "
1065 "from another query";
1066 }
1067 }
1068
1069 /* we don't support aggregates without group by */
1070 if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL))
1071 {
1072 preconditionsSatisfied = false;
1073 errorDetail = "Aggregates without group by are currently unsupported "
1074 "when a subquery references a column from another query";
1075 }
1076
1077 /* having clause without group by on partition column is not supported */
1078 if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL))
1079 {
1080 preconditionsSatisfied = false;
1081 errorDetail = "Having qual without group by on partition column is "
1082 "currently unsupported when a subquery references "
1083 "a column from another query";
1084 }
1085
1086 /*
1087 * We support window functions when the window function
1088 * is partitioned on distribution column.
1089 */
1090 StringInfo errorInfo = NULL;
1091 if (subqueryTree->hasWindowFuncs && !SafeToPushdownWindowFunction(subqueryTree,
1092 &errorInfo))
1093 {
1094 errorDetail = (char *) errorInfo->data;
1095 preconditionsSatisfied = false;
1096 }
1097
1098 /* distinct clause list must include partition column */
1099 if (subqueryTree->distinctClause)
1100 {
1101 List *distinctClauseList = subqueryTree->distinctClause;
1102 List *targetEntryList = subqueryTree->targetList;
1103 List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList,
1104 targetEntryList);
1105 bool distinctOnPartitionColumn =
1106 TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList);
1107 if (!distinctOnPartitionColumn)
1108 {
1109 preconditionsSatisfied = false;
1110 errorDetail = "Distinct on columns without partition column is "
1111 "currently unsupported";
1112 }
1113 }
1114
1115 /* finally check and return deferred if not satisfied */
1116 if (!preconditionsSatisfied)
1117 {
1118 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1119 "cannot push down this subquery",
1120 errorDetail, NULL);
1121 }
1122
1123 return NULL;
1124 }
1125
1126
1127 /*
1128 * DeferErrorIfUnsupportedTableCombination checks if the given query tree contains any
1129 * unsupported range table combinations. For this, the function walks over all
1130 * range tables in the join tree, and checks if they correspond to simple relations
1131 * or subqueries. It also checks if there is a join between a regular table and
1132 * a subquery and if join is on more than two range table entries. If any error is found,
1133 * a deferred error is returned. Else, NULL is returned.
1134 */
1135 static DeferredErrorMessage *
DeferErrorIfUnsupportedTableCombination(Query * queryTree)1136 DeferErrorIfUnsupportedTableCombination(Query *queryTree)
1137 {
1138 List *rangeTableList = queryTree->rtable;
1139 List *joinTreeTableIndexList = NIL;
1140 int joinTreeTableIndex = 0;
1141 bool unsupportedTableCombination = false;
1142 char *errorDetail = NULL;
1143
1144 /*
1145 * Extract all range table indexes from the join tree. Note that sub-queries
1146 * that get pulled up by PostgreSQL don't appear in this join tree.
1147 */
1148 ExtractRangeTableIndexWalker((Node *) queryTree->jointree,
1149 &joinTreeTableIndexList);
1150
1151 foreach_int(joinTreeTableIndex, joinTreeTableIndexList)
1152 {
1153 /*
1154 * Join tree's range table index starts from 1 in the query tree. But,
1155 * list indexes start from 0.
1156 */
1157 int rangeTableListIndex = joinTreeTableIndex - 1;
1158
1159 RangeTblEntry *rangeTableEntry =
1160 (RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex);
1161
1162 /*
1163 * Check if the range table in the join tree is a simple relation, a
1164 * subquery, or immutable function.
1165 */
1166 if (rangeTableEntry->rtekind == RTE_RELATION ||
1167 rangeTableEntry->rtekind == RTE_SUBQUERY ||
1168 rangeTableEntry->rtekind == RTE_RESULT)
1169 {
1170 /* accepted */
1171 }
1172 else if (rangeTableEntry->rtekind == RTE_VALUES)
1173 {
1174 /*
1175 * When GUC is set to -1, we disable materialization, when set to 0,
1176 * we materialize everything. Other values are compared against the
1177 * length of the values_lists.
1178 */
1179 int valuesRowCount = list_length(rangeTableEntry->values_lists);
1180 if (ValuesMaterializationThreshold >= 0 &&
1181 valuesRowCount > ValuesMaterializationThreshold)
1182 {
1183 unsupportedTableCombination = true;
1184 errorDetail = "VALUES has more than "
1185 "\"citus.values_materialization_threshold\" "
1186 "entries, so it is materialized";
1187 }
1188 else if (contain_mutable_functions((Node *) rangeTableEntry->values_lists))
1189 {
1190 /* VALUES should not contain mutable functions */
1191 unsupportedTableCombination = true;
1192 errorDetail = "Only immutable functions can be used in VALUES";
1193 }
1194 }
1195 else if (rangeTableEntry->rtekind == RTE_FUNCTION)
1196 {
1197 List *functionList = rangeTableEntry->functions;
1198
1199 if (list_length(functionList) == 1 &&
1200 ContainsReadIntermediateResultFunction(linitial(functionList)))
1201 {
1202 /*
1203 * The read_intermediate_result function is volatile, but we know
1204 * it has the same result across all nodes and can therefore treat
1205 * it as a reference table.
1206 */
1207 }
1208 else if (contain_mutable_functions((Node *) functionList))
1209 {
1210 unsupportedTableCombination = true;
1211 errorDetail = "Only immutable functions can be used as a table "
1212 "expressions in a multi-shard query";
1213 }
1214 else
1215 {
1216 /* immutable function RTEs are treated as reference tables */
1217 }
1218 }
1219 else if (rangeTableEntry->rtekind == RTE_CTE)
1220 {
1221 unsupportedTableCombination = true;
1222 errorDetail = "CTEs in subqueries are currently unsupported";
1223 break;
1224 }
1225 else
1226 {
1227 unsupportedTableCombination = true;
1228 errorDetail = "Table expressions other than relations, subqueries, "
1229 "and immutable functions are currently unsupported";
1230 break;
1231 }
1232 }
1233
1234 /* finally check and error out if not satisfied */
1235 if (unsupportedTableCombination)
1236 {
1237 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1238 "cannot push down this subquery",
1239 errorDetail, NULL);
1240 }
1241
1242 return NULL;
1243 }
1244
1245
1246 /*
1247 * DeferErrorIfUnsupportedUnionQuery is a helper function for ErrorIfCannotPushdownSubquery().
1248 * The function also errors out for set operations INTERSECT and EXCEPT.
1249 */
1250 DeferredErrorMessage *
DeferErrorIfUnsupportedUnionQuery(Query * subqueryTree)1251 DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree)
1252 {
1253 List *setOperationStatementList = NIL;
1254 ListCell *setOperationStatmentCell = NULL;
1255 RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
1256
1257 ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations,
1258 &setOperationStatementList);
1259 foreach(setOperationStatmentCell, setOperationStatementList)
1260 {
1261 SetOperationStmt *setOperation =
1262 (SetOperationStmt *) lfirst(setOperationStatmentCell);
1263 Node *leftArg = setOperation->larg;
1264 Node *rightArg = setOperation->rarg;
1265 int leftArgRTI = 0;
1266 int rightArgRTI = 0;
1267
1268 if (setOperation->op != SETOP_UNION)
1269 {
1270 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1271 "cannot push down this subquery",
1272 "Intersect and Except are currently unsupported",
1273 NULL);
1274 }
1275
1276 if (IsA(leftArg, RangeTblRef))
1277 {
1278 leftArgRTI = ((RangeTblRef *) leftArg)->rtindex;
1279 Query *leftArgSubquery = rt_fetch(leftArgRTI,
1280 subqueryTree->rtable)->subquery;
1281 recurType = FromClauseRecurringTupleType(leftArgSubquery);
1282 if (recurType != RECURRING_TUPLES_INVALID)
1283 {
1284 break;
1285 }
1286 }
1287
1288 if (IsA(rightArg, RangeTblRef))
1289 {
1290 rightArgRTI = ((RangeTblRef *) rightArg)->rtindex;
1291 Query *rightArgSubquery = rt_fetch(rightArgRTI,
1292 subqueryTree->rtable)->subquery;
1293 recurType = FromClauseRecurringTupleType(rightArgSubquery);
1294 if (recurType != RECURRING_TUPLES_INVALID)
1295 {
1296 break;
1297 }
1298 }
1299 }
1300
1301 if (recurType == RECURRING_TUPLES_REFERENCE_TABLE)
1302 {
1303 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1304 "cannot push down this subquery",
1305 "Reference tables are not supported with union operator",
1306 NULL);
1307 }
1308 else if (recurType == RECURRING_TUPLES_FUNCTION)
1309 {
1310 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1311 "cannot push down this subquery",
1312 "Table functions are not supported with union operator",
1313 NULL);
1314 }
1315 else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE)
1316 {
1317 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1318 "cannot push down this subquery",
1319 "Subqueries without a FROM clause are not supported with "
1320 "union operator", NULL);
1321 }
1322 else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
1323 {
1324 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1325 "cannot push down this subquery",
1326 "Complex subqueries and CTEs are not supported within a "
1327 "UNION", NULL);
1328 }
1329 else if (recurType == RECURRING_TUPLES_VALUES)
1330 {
1331 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1332 "cannot push down this subquery",
1333 "VALUES is not supported within a "
1334 "UNION", NULL);
1335 }
1336
1337 return NULL;
1338 }
1339
1340
1341 /*
1342 * ExtractSetOperationStatementWalker walks over a set operations statment,
1343 * and finds all set operations in the tree.
1344 */
1345 static bool
ExtractSetOperationStatmentWalker(Node * node,List ** setOperationList)1346 ExtractSetOperationStatmentWalker(Node *node, List **setOperationList)
1347 {
1348 if (node == NULL)
1349 {
1350 return false;
1351 }
1352
1353 if (IsA(node, SetOperationStmt))
1354 {
1355 SetOperationStmt *setOperation = (SetOperationStmt *) node;
1356
1357 (*setOperationList) = lappend(*setOperationList, setOperation);
1358 }
1359
1360 bool walkerResult = expression_tree_walker(node,
1361 ExtractSetOperationStatmentWalker,
1362 setOperationList);
1363
1364 return walkerResult;
1365 }
1366
1367
1368 /*
1369 * RelationInfoContainsOnlyRecurringTuples returns false if any of the relations in
1370 * a RelOptInfo is not recurring.
1371 */
1372 static bool
RelationInfoContainsOnlyRecurringTuples(PlannerInfo * plannerInfo,Relids relids)1373 RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, Relids relids)
1374 {
1375 int relationId = -1;
1376
1377 while ((relationId = bms_next_member(relids, relationId)) >= 0)
1378 {
1379 RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId];
1380
1381 if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry),
1382 IsDistributedTableRTE))
1383 {
1384 /* we already found a distributed table, no need to check further */
1385 return false;
1386 }
1387
1388 /*
1389 * If there are no distributed tables, there should be at least
1390 * one recurring rte.
1391 */
1392 RecurringTuplesType recurType PG_USED_FOR_ASSERTS_ONLY;
1393 Assert(ContainsRecurringRTE(rangeTableEntry, &recurType));
1394 }
1395
1396 return true;
1397 }
1398
1399
1400 /*
1401 * FetchFirstRecurType checks whether the relationInfo
1402 * contains any recurring table expression, namely a reference table,
1403 * or immutable function. If found, FetchFirstRecurType
1404 * returns true.
1405 *
1406 * Note that since relation ids of relationInfo indexes to the range
1407 * table entry list of planner info, planner info is also passed.
1408 */
1409 static RecurringTuplesType
FetchFirstRecurType(PlannerInfo * plannerInfo,Relids relids)1410 FetchFirstRecurType(PlannerInfo *plannerInfo, Relids relids)
1411 {
1412 RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
1413 int relationId = -1;
1414
1415 while ((relationId = bms_next_member(relids, relationId)) >= 0)
1416 {
1417 RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId];
1418
1419 /* relationInfo has this range table entry */
1420 if (ContainsRecurringRTE(rangeTableEntry, &recurType))
1421 {
1422 return recurType;
1423 }
1424 }
1425
1426 return recurType;
1427 }
1428
1429
1430 /*
1431 * ContainsRecurringRTE returns whether the range table entry contains
1432 * any entry that generates the same set of tuples when repeating it in
1433 * a query on different shards.
1434 */
1435 static bool
ContainsRecurringRTE(RangeTblEntry * rangeTableEntry,RecurringTuplesType * recurType)1436 ContainsRecurringRTE(RangeTblEntry *rangeTableEntry, RecurringTuplesType *recurType)
1437 {
1438 return ContainsRecurringRangeTable(list_make1(rangeTableEntry), recurType);
1439 }
1440
1441
1442 /*
1443 * ContainsRecurringRangeTable returns whether the range table list contains
1444 * any entry that generates the same set of tuples when repeating it in
1445 * a query on different shards.
1446 */
1447 static bool
ContainsRecurringRangeTable(List * rangeTable,RecurringTuplesType * recurType)1448 ContainsRecurringRangeTable(List *rangeTable, RecurringTuplesType *recurType)
1449 {
1450 return range_table_walker(rangeTable, HasRecurringTuples, recurType,
1451 QTW_EXAMINE_RTES_BEFORE);
1452 }
1453
1454
1455 /*
1456 * HasRecurringTuples returns whether any part of the expression will generate
1457 * the same set of tuples in every query on shards when executing a distributed
1458 * query.
1459 */
1460 static bool
HasRecurringTuples(Node * node,RecurringTuplesType * recurType)1461 HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
1462 {
1463 if (node == NULL)
1464 {
1465 return false;
1466 }
1467
1468 if (IsA(node, RangeTblEntry))
1469 {
1470 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
1471
1472 if (rangeTableEntry->rtekind == RTE_RELATION)
1473 {
1474 Oid relationId = rangeTableEntry->relid;
1475 if (IsCitusTableType(relationId, REFERENCE_TABLE))
1476 {
1477 *recurType = RECURRING_TUPLES_REFERENCE_TABLE;
1478
1479 /*
1480 * Tuples from reference tables will recur in every query on shards
1481 * that includes it.
1482 */
1483 return true;
1484 }
1485 }
1486 else if (rangeTableEntry->rtekind == RTE_FUNCTION)
1487 {
1488 List *functionList = rangeTableEntry->functions;
1489
1490 if (list_length(functionList) == 1 &&
1491 ContainsReadIntermediateResultFunction((Node *) functionList))
1492 {
1493 *recurType = RECURRING_TUPLES_RESULT_FUNCTION;
1494 }
1495 else
1496 {
1497 *recurType = RECURRING_TUPLES_FUNCTION;
1498 }
1499
1500 /*
1501 * Tuples from functions will recur in every query on shards that includes
1502 * it.
1503 */
1504 return true;
1505 }
1506 else if (rangeTableEntry->rtekind == RTE_RESULT)
1507 {
1508 *recurType = RECURRING_TUPLES_EMPTY_JOIN_TREE;
1509 return true;
1510 }
1511 else if (rangeTableEntry->rtekind == RTE_VALUES)
1512 {
1513 *recurType = RECURRING_TUPLES_VALUES;
1514 return true;
1515 }
1516
1517 return false;
1518 }
1519 else if (IsA(node, Query))
1520 {
1521 Query *query = (Query *) node;
1522
1523 if (HasEmptyJoinTree(query))
1524 {
1525 *recurType = RECURRING_TUPLES_EMPTY_JOIN_TREE;
1526
1527 /*
1528 * Queries with empty join trees will recur in every query on shards
1529 * that includes it.
1530 */
1531 return true;
1532 }
1533
1534 return query_tree_walker((Query *) node, HasRecurringTuples,
1535 recurType, QTW_EXAMINE_RTES_BEFORE);
1536 }
1537
1538 return expression_tree_walker(node, HasRecurringTuples, recurType);
1539 }
1540
1541
1542 /*
1543 * SubqueryPushdownMultiNodeTree creates logical plan for subquery pushdown logic.
1544 * Note that this logic will be changed in next iterations, so we decoupled it
1545 * from other parts of code although it causes some code duplication.
1546 *
1547 * Current subquery pushdown support in MultiTree logic requires a single range
1548 * table entry in the top most from clause. Therefore we inject a synthetic
1549 * query derived from the top level query and make it the only range table
1550 * entry for the top level query. This way we can push down any subquery joins
1551 * down to workers without invoking join order planner.
1552 */
1553 static MultiNode *
SubqueryPushdownMultiNodeTree(Query * originalQuery)1554 SubqueryPushdownMultiNodeTree(Query *originalQuery)
1555 {
1556 Query *queryTree = copyObject(originalQuery);
1557 List *targetEntryList = queryTree->targetList;
1558 MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect);
1559
1560 /* verify we can perform distributed planning on this query */
1561 DeferredErrorMessage *unsupportedQueryError = DeferErrorIfQueryNotSupported(
1562 queryTree);
1563 if (unsupportedQueryError != NULL)
1564 {
1565 RaiseDeferredError(unsupportedQueryError, ERROR);
1566 }
1567
1568 /*
1569 * We would be creating a new Query and pushing down top level query's
1570 * contents down to it. Join and filter clauses in higher level query would
1571 * be transferred to lower query. Therefore after this function we would
1572 * only have a single range table entry in the top level query. We need to
1573 * create a target list entry in lower query for each column reference in
1574 * upper level query's target list and having clauses. Any column reference
1575 * in the upper query will be updated to have varno=1, and varattno=<resno>
1576 * of matching target entry in pushed down query.
1577 * Consider query
1578 * SELECT s1.a, sum(s2.c)
1579 * FROM (some subquery) s1, (some subquery) s2
1580 * WHERE s1.a = s2.a
1581 * GROUP BY s1.a
1582 * HAVING avg(s2.b);
1583 *
1584 * We want to prepare a multi tree to avoid subquery joins at top level,
1585 * therefore above query is converted to an equivalent
1586 * SELECT worker_column_0, sum(worker_column_1)
1587 * FROM (
1588 * SELECT
1589 * s1.a AS worker_column_0,
1590 * s2.c AS worker_column_1,
1591 * s2.b AS worker_column_2
1592 * FROM (some subquery) s1, (some subquery) s2
1593 * WHERE s1.a = s2.a) worker_subquery
1594 * GROUP BY worker_column_0
1595 * HAVING avg(worker_column_2);
1596 * After this conversion MultiTree is created as follows
1597 *
1598 * MultiExtendedOpNode(
1599 * targetList : worker_column_0, sum(worker_column_1)
1600 * groupBy : worker_column_0
1601 * having : avg(worker_column_2))
1602 * --->MultiProject (worker_column_0, worker_column_1, worker_column_2)
1603 * --->---> MultiTable (subquery : worker_subquery)
1604 *
1605 * Master and worker queries will be created out of this MultiTree at later stages.
1606 */
1607
1608 /*
1609 * columnList contains all columns returned by subquery. Subquery target
1610 * entry list, subquery range table entry's column name list are derived from
1611 * columnList. Columns mentioned in multiProject node and multiExtendedOp
1612 * node are indexed with their respective position in columnList.
1613 */
1614 List *targetColumnList = pull_vars_of_level((Node *) targetEntryList, 0);
1615 List *havingClauseColumnList = pull_var_clause_default(queryTree->havingQual);
1616 List *columnList = list_concat(targetColumnList, havingClauseColumnList);
1617
1618 /* create a target entry for each unique column */
1619 List *subqueryTargetEntryList = CreateSubqueryTargetListAndAdjustVars(columnList);
1620
1621 /* new query only has target entries, join tree, and rtable*/
1622 Query *pushedDownQuery = makeNode(Query);
1623 pushedDownQuery->commandType = queryTree->commandType;
1624 pushedDownQuery->targetList = subqueryTargetEntryList;
1625 pushedDownQuery->jointree = copyObject(queryTree->jointree);
1626 pushedDownQuery->rtable = copyObject(queryTree->rtable);
1627 pushedDownQuery->setOperations = copyObject(queryTree->setOperations);
1628 pushedDownQuery->querySource = queryTree->querySource;
1629 pushedDownQuery->hasSubLinks = queryTree->hasSubLinks;
1630
1631 MultiTable *subqueryNode = MultiSubqueryPushdownTable(pushedDownQuery);
1632
1633 SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode);
1634 MultiNode *currentTopNode = (MultiNode *) subqueryCollectNode;
1635
1636 /* build project node for the columns to project */
1637 MultiProject *projectNode = MultiProjectNode(targetEntryList);
1638 SetChild((MultiUnaryNode *) projectNode, currentTopNode);
1639 currentTopNode = (MultiNode *) projectNode;
1640
1641 /*
1642 * We build the extended operator node to capture aggregate functions, group
1643 * clauses, sort clauses, limit/offset clauses, and expressions. We need to
1644 * distinguish between aggregates and expressions; and we address this later
1645 * in the logical optimizer.
1646 */
1647 MultiExtendedOp *extendedOpNode = MultiExtendedOpNode(queryTree, originalQuery);
1648
1649 /*
1650 * Postgres standard planner converts having qual node to a list of and
1651 * clauses and expects havingQual to be of type List when executing the
1652 * query later. This function is called on an original query, therefore
1653 * havingQual has not been converted yet. Perform conversion here.
1654 */
1655 if (extendedOpNode->havingQual != NULL &&
1656 !IsA(extendedOpNode->havingQual, List))
1657 {
1658 extendedOpNode->havingQual =
1659 (Node *) make_ands_implicit((Expr *) extendedOpNode->havingQual);
1660 }
1661
1662 /*
1663 * Group by on primary key allows all columns to appear in the target
1664 * list, but once we wrap the join tree into a subquery the GROUP BY
1665 * will no longer directly refer to the primary key and referencing
1666 * columns that are not in the GROUP BY would result in an error. To
1667 * prevent that we wrap all the columns that do not appear in the
1668 * GROUP BY in an any_value aggregate.
1669 */
1670 if (extendedOpNode->groupClauseList != NIL)
1671 {
1672 extendedOpNode->targetList = (List *) WrapUngroupedVarsInAnyValueAggregate(
1673 (Node *) extendedOpNode->targetList,
1674 extendedOpNode->groupClauseList,
1675 extendedOpNode->targetList, true);
1676
1677 extendedOpNode->havingQual = WrapUngroupedVarsInAnyValueAggregate(
1678 (Node *) extendedOpNode->havingQual,
1679 extendedOpNode->groupClauseList,
1680 extendedOpNode->targetList, false);
1681 }
1682
1683 /*
1684 * Postgres standard planner evaluates expressions in the LIMIT/OFFSET clauses.
1685 * Since we're using original query here, we should manually evaluate the
1686 * expression on the LIMIT and OFFSET clauses. Note that logical optimizer
1687 * expects those clauses to be already evaluated.
1688 */
1689 extendedOpNode->limitCount =
1690 PartiallyEvaluateExpression(extendedOpNode->limitCount, NULL);
1691 extendedOpNode->limitOffset =
1692 PartiallyEvaluateExpression(extendedOpNode->limitOffset, NULL);
1693
1694 SetChild((MultiUnaryNode *) extendedOpNode, currentTopNode);
1695 currentTopNode = (MultiNode *) extendedOpNode;
1696
1697 return currentTopNode;
1698 }
1699
1700
1701 /*
1702 * CreateSubqueryTargetListAndAdjustVars creates a target entry for each unique
1703 * column in the column list, adjusts the columns to point into the subquery target
1704 * list and returns the new subquery target list.
1705 */
1706 static List *
CreateSubqueryTargetListAndAdjustVars(List * columnList)1707 CreateSubqueryTargetListAndAdjustVars(List *columnList)
1708 {
1709 Var *column = NULL;
1710 List *subqueryTargetEntryList = NIL;
1711
1712 foreach_ptr(column, columnList)
1713 {
1714 /*
1715 * To avoid adding the same column multiple times, we first check whether there
1716 * is already a target entry containing a Var with the given varno and varattno.
1717 */
1718 AttrNumber resNo = FindResnoForVarInTargetList(subqueryTargetEntryList,
1719 column->varno, column->varattno);
1720 if (resNo == InvalidAttrNumber)
1721 {
1722 /* Var is not yet on the target list, create a new entry */
1723 resNo = list_length(subqueryTargetEntryList) + 1;
1724
1725 /*
1726 * The join tree in the subquery is an exact duplicate of the original
1727 * query. Hence, we can make a copy of the original Var. However, if the
1728 * original Var was in a sublink it would be pointing up whereas now it
1729 * will be placed directly on the target list. Hence we reset the
1730 * varlevelsup.
1731 */
1732 Var *subqueryTargetListVar = (Var *) copyObject(column);
1733
1734 subqueryTargetListVar->varlevelsup = 0;
1735
1736 TargetEntry *newTargetEntry = makeNode(TargetEntry);
1737 newTargetEntry->expr = (Expr *) subqueryTargetListVar;
1738 newTargetEntry->resname = WorkerColumnName(resNo);
1739 newTargetEntry->resjunk = false;
1740 newTargetEntry->resno = resNo;
1741
1742 subqueryTargetEntryList = lappend(subqueryTargetEntryList, newTargetEntry);
1743 }
1744
1745 /*
1746 * Change the original column reference to point to the target list
1747 * entry in the subquery. There is only 1 subquery, so the varno is 1.
1748 */
1749 column->varno = 1;
1750 column->varattno = resNo;
1751 }
1752
1753 return subqueryTargetEntryList;
1754 }
1755
1756
1757 /*
1758 * FindResnoForVarInTargetList finds a Var on a target list that has the given varno
1759 * (range table entry number) and varattno (column number) and returns the resno
1760 * of the target list entry.
1761 */
1762 static AttrNumber
FindResnoForVarInTargetList(List * targetList,int varno,int varattno)1763 FindResnoForVarInTargetList(List *targetList, int varno, int varattno)
1764 {
1765 TargetEntry *targetEntry = NULL;
1766 foreach_ptr(targetEntry, targetList)
1767 {
1768 if (!IsA(targetEntry->expr, Var))
1769 {
1770 continue;
1771 }
1772
1773 Var *targetEntryVar = (Var *) targetEntry->expr;
1774
1775 if (targetEntryVar->varno == varno && targetEntryVar->varattno == varattno)
1776 {
1777 return targetEntry->resno;
1778 }
1779 }
1780
1781 return InvalidAttrNumber;
1782 }
1783
1784
1785 /*
1786 * MultiSubqueryPushdownTable creates a MultiTable from the given subquery,
1787 * populates column list and returns the multitable.
1788 */
1789 static MultiTable *
MultiSubqueryPushdownTable(Query * subquery)1790 MultiSubqueryPushdownTable(Query *subquery)
1791 {
1792 StringInfo rteName = makeStringInfo();
1793 List *columnNamesList = NIL;
1794 ListCell *targetEntryCell = NULL;
1795
1796 appendStringInfo(rteName, "worker_subquery");
1797
1798 foreach(targetEntryCell, subquery->targetList)
1799 {
1800 TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
1801 columnNamesList = lappend(columnNamesList, makeString(targetEntry->resname));
1802 }
1803
1804 MultiTable *subqueryTableNode = CitusMakeNode(MultiTable);
1805 subqueryTableNode->subquery = subquery;
1806 subqueryTableNode->relationId = SUBQUERY_PUSHDOWN_RELATION_ID;
1807 subqueryTableNode->rangeTableId = SUBQUERY_RANGE_TABLE_ID;
1808 subqueryTableNode->partitionColumn = PartitionColumnForPushedDownSubquery(subquery);
1809 subqueryTableNode->alias = makeNode(Alias);
1810 subqueryTableNode->alias->aliasname = rteName->data;
1811 subqueryTableNode->referenceNames = makeNode(Alias);
1812 subqueryTableNode->referenceNames->aliasname = rteName->data;
1813 subqueryTableNode->referenceNames->colnames = columnNamesList;
1814
1815 return subqueryTableNode;
1816 }
1817
1818
1819 /*
1820 * PartitionColumnForPushedDownSubquery finds the partition column on the target
1821 * list of a pushed down subquery.
1822 */
1823 static Var *
PartitionColumnForPushedDownSubquery(Query * query)1824 PartitionColumnForPushedDownSubquery(Query *query)
1825 {
1826 List *targetEntryList = query->targetList;
1827
1828 TargetEntry *targetEntry = NULL;
1829 foreach_ptr(targetEntry, targetEntryList)
1830 {
1831 if (targetEntry->resjunk)
1832 {
1833 continue;
1834 }
1835
1836 Expr *targetExpression = targetEntry->expr;
1837 if (IsA(targetExpression, Var))
1838 {
1839 bool skipOuterVars = true;
1840 bool isPartitionColumn = IsPartitionColumn(targetExpression, query,
1841 skipOuterVars);
1842 if (isPartitionColumn)
1843 {
1844 Var *partitionColumn = copyObject((Var *) targetExpression);
1845
1846 /* the pushed down subquery is the only range table entry */
1847 partitionColumn->varno = 1;
1848
1849 /* point the var to the position in the subquery target list */
1850 partitionColumn->varattno = targetEntry->resno;
1851
1852 return partitionColumn;
1853 }
1854 }
1855 }
1856
1857 return NULL;
1858 }
1859