1 /*-------------------------------------------------------------------------
2  *
3  * query_pushdown_planning.c
4  *
5  * Routines for creating pushdown plans for queries. Both select and modify
6  * queries can be planned using query pushdown logic passing the checks given
7  * in this file.
8  *
9  * Checks are controlled to understand whether the query can be sent to worker
10  * nodes by simply adding shard_id to table names and getting the correct result
11  * from them. That means, all the required data is present on the workers.
12  *
13  * For select queries, Citus try to use query pushdown planner if it has a
14  * subquery or function RTEs. For modify queries, Citus try to use query pushdown
15  * planner if the query accesses multiple tables.
16  *
17  * Copyright (c) Citus Data, Inc.
18  *
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "distributed/pg_version_constants.h"
25 
26 #include "distributed/citus_clauses.h"
27 #include "distributed/citus_ruleutils.h"
28 #include "distributed/deparse_shard_query.h"
29 #include "distributed/listutils.h"
30 #include "distributed/metadata_cache.h"
31 #include "distributed/multi_logical_optimizer.h"
32 #include "distributed/multi_logical_planner.h"
33 #include "distributed/multi_router_planner.h"
34 #include "distributed/pg_dist_partition.h"
35 #include "distributed/query_utils.h"
36 #include "distributed/query_pushdown_planning.h"
37 #include "distributed/recursive_planning.h"
38 #include "distributed/relation_restriction_equivalence.h"
39 #include "distributed/version_compat.h"
40 #include "nodes/nodeFuncs.h"
41 #include "nodes/makefuncs.h"
42 #include "optimizer/optimizer.h"
43 #include "nodes/pg_list.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parsetree.h"
46 
47 
48 /*
49  * RecurringTuplesType is used to distinguish different types of expressions
50  * that always produce the same set of tuples when a shard is queried. We make
51  * this distinction to produce relevant error messages when recurring tuples
52  * are used in a way that would give incorrect results.
53  */
54 typedef enum RecurringTuplesType
55 {
56 	RECURRING_TUPLES_INVALID = 0,
57 	RECURRING_TUPLES_REFERENCE_TABLE,
58 	RECURRING_TUPLES_FUNCTION,
59 	RECURRING_TUPLES_EMPTY_JOIN_TREE,
60 	RECURRING_TUPLES_RESULT_FUNCTION,
61 	RECURRING_TUPLES_VALUES
62 } RecurringTuplesType;
63 
64 
65 /* Config variable managed via guc.c */
66 bool SubqueryPushdown = false; /* is subquery pushdown enabled */
67 int ValuesMaterializationThreshold = 100;
68 
69 /* Local functions forward declarations */
70 static bool JoinTreeContainsSubqueryWalker(Node *joinTreeNode, void *context);
71 static bool IsFunctionOrValuesRTE(Node *node);
72 static bool IsOuterJoinExpr(Node *node);
73 static bool WindowPartitionOnDistributionColumn(Query *query);
74 static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree);
75 static RecurringTuplesType FromClauseRecurringTupleType(Query *queryTree);
76 static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin(
77 	PlannerRestrictionContext *plannerRestrictionContext);
78 static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree);
79 static DeferredErrorMessage * DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree);
80 static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList);
81 static RecurringTuplesType FetchFirstRecurType(PlannerInfo *plannerInfo,
82 											   Relids relids);
83 static bool ContainsRecurringRTE(RangeTblEntry *rangeTableEntry,
84 								 RecurringTuplesType *recurType);
85 static bool ContainsRecurringRangeTable(List *rangeTable, RecurringTuplesType *recurType);
86 static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
87 static MultiNode * SubqueryPushdownMultiNodeTree(Query *queryTree);
88 static MultiTable * MultiSubqueryPushdownTable(Query *subquery);
89 static List * CreateSubqueryTargetListAndAdjustVars(List *columnList);
90 static AttrNumber FindResnoForVarInTargetList(List *targetList, int varno, int varattno);
91 static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo,
92 													Relids relids);
93 static Var * PartitionColumnForPushedDownSubquery(Query *query);
94 
95 
96 /*
97  * ShouldUseSubqueryPushDown determines whether it's desirable to use
98  * subquery pushdown to plan the query based on the original and
99  * rewritten query.
100  */
101 bool
ShouldUseSubqueryPushDown(Query * originalQuery,Query * rewrittenQuery,PlannerRestrictionContext * plannerRestrictionContext)102 ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
103 						  PlannerRestrictionContext *plannerRestrictionContext)
104 {
105 	/*
106 	 * We check the existence of subqueries in FROM clause on the modified query
107 	 * given that if postgres already flattened the subqueries, MultiNodeTree()
108 	 * can plan corresponding distributed plan.
109 	 */
110 	if (JoinTreeContainsSubquery(rewrittenQuery))
111 	{
112 		return true;
113 	}
114 
115 	/*
116 	 * We check the existence of subqueries in WHERE and HAVING clause on the
117 	 * modified query. In some cases subqueries in the original query are
118 	 * converted into inner joins and in those cases MultiNodeTree() can plan
119 	 * the rewritten plan.
120 	 */
121 	if (WhereOrHavingClauseContainsSubquery(rewrittenQuery))
122 	{
123 		return true;
124 	}
125 
126 	/*
127 	 * We check the existence of subqueries in the SELECT clause on the modified
128 	 * query.
129 	 */
130 	if (TargetListContainsSubquery(rewrittenQuery->targetList))
131 	{
132 		return true;
133 	}
134 
135 
136 	/*
137 	 * We check if postgres planned any semi joins, MultiNodeTree doesn't
138 	 * support these so we fail. Postgres is able to replace some IN/ANY
139 	 * subqueries with semi joins and then replace those with inner joins (ones
140 	 * where the subquery returns unique results). This allows MultiNodeTree to
141 	 * execute these subqueries (because they are converted to inner joins).
142 	 * However, even in that case the rewrittenQuery still contains join nodes
143 	 * with jointype JOIN_SEMI because Postgres doesn't actually update these.
144 	 * The way we find out instead if it actually planned semi joins, is by
145 	 * checking the joins that were sent to multi_join_restriction_hook. If no
146 	 * joins of type JOIN_SEMI are sent it is safe to convert all JOIN_SEMI
147 	 * nodes to JOIN_INNER nodes (which is what is done in MultiNodeTree).
148 	 */
149 	JoinRestrictionContext *joinRestrictionContext =
150 		plannerRestrictionContext->joinRestrictionContext;
151 	if (joinRestrictionContext->hasSemiJoin)
152 	{
153 		return true;
154 	}
155 
156 
157 	/*
158 	 * We process function and VALUES RTEs as subqueries, since the join order planner
159 	 * does not know how to handle them.
160 	 */
161 	if (FindNodeMatchingCheckFunction((Node *) originalQuery, IsFunctionOrValuesRTE))
162 	{
163 		return true;
164 	}
165 
166 	/*
167 	 * We handle outer joins as subqueries, since the join order planner
168 	 * does not know how to handle them.
169 	 */
170 	if (FindNodeMatchingCheckFunction((Node *) originalQuery->jointree, IsOuterJoinExpr))
171 	{
172 		return true;
173 	}
174 
175 	/*
176 	 * Original query may not have an outer join while rewritten query does.
177 	 * We should push down in this case.
178 	 * An example of this is https://github.com/citusdata/citus/issues/2739
179 	 * where postgres pulls-up the outer-join in the subquery.
180 	 */
181 	if (FindNodeMatchingCheckFunction((Node *) rewrittenQuery->jointree, IsOuterJoinExpr))
182 	{
183 		return true;
184 	}
185 
186 	/*
187 	 * Some unsupported join clauses in logical planner
188 	 * may be supported by subquery pushdown planner.
189 	 */
190 	List *qualifierList = QualifierList(rewrittenQuery->jointree);
191 	if (DeferErrorIfUnsupportedClause(qualifierList) != NULL)
192 	{
193 		return true;
194 	}
195 
196 	/* check if the query has a window function and it is safe to pushdown */
197 	if (originalQuery->hasWindowFuncs &&
198 		SafeToPushdownWindowFunction(originalQuery, NULL))
199 	{
200 		return true;
201 	}
202 
203 	return false;
204 }
205 
206 
207 /*
208  * JoinTreeContainsSubquery returns true if the input query contains any subqueries
209  * in the join tree (e.g., FROM clause).
210  */
211 bool
JoinTreeContainsSubquery(Query * query)212 JoinTreeContainsSubquery(Query *query)
213 {
214 	FromExpr *joinTree = query->jointree;
215 
216 	if (!joinTree)
217 	{
218 		return false;
219 	}
220 
221 	return JoinTreeContainsSubqueryWalker((Node *) joinTree, query);
222 }
223 
224 
225 /*
226  * HasEmptyJoinTree returns whether the query selects from anything.
227  */
228 bool
HasEmptyJoinTree(Query * query)229 HasEmptyJoinTree(Query *query)
230 {
231 	if (query->rtable == NIL)
232 	{
233 		return true;
234 	}
235 	else if (list_length(query->rtable) == 1)
236 	{
237 		RangeTblEntry *rte = (RangeTblEntry *) linitial(query->rtable);
238 		if (rte->rtekind == RTE_RESULT)
239 		{
240 			return true;
241 		}
242 	}
243 
244 	return false;
245 }
246 
247 
248 /*
249  * JoinTreeContainsSubqueryWalker returns true if the input joinTreeNode
250  * references to a subquery. Otherwise, recurses into the expression.
251  */
252 static bool
JoinTreeContainsSubqueryWalker(Node * joinTreeNode,void * context)253 JoinTreeContainsSubqueryWalker(Node *joinTreeNode, void *context)
254 {
255 	if (joinTreeNode == NULL)
256 	{
257 		return false;
258 	}
259 
260 	if (IsA(joinTreeNode, RangeTblRef))
261 	{
262 		Query *query = (Query *) context;
263 
264 		RangeTblRef *rangeTableRef = (RangeTblRef *) joinTreeNode;
265 		RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableRef->rtindex, query->rtable);
266 
267 		if (rangeTableEntry->rtekind == RTE_SUBQUERY)
268 		{
269 			return true;
270 		}
271 
272 		return false;
273 	}
274 
275 	return expression_tree_walker(joinTreeNode, JoinTreeContainsSubqueryWalker, context);
276 }
277 
278 
279 /*
280  * WhereOrHavingClauseContainsSubquery returns true if the input query contains
281  * any subqueries in the WHERE or HAVING clause.
282  */
283 bool
WhereOrHavingClauseContainsSubquery(Query * query)284 WhereOrHavingClauseContainsSubquery(Query *query)
285 {
286 	if (FindNodeMatchingCheckFunction(query->havingQual, IsNodeSubquery))
287 	{
288 		return true;
289 	}
290 
291 	if (!query->jointree)
292 	{
293 		return false;
294 	}
295 
296 	/*
297 	 * We search the whole jointree here, not just the quals. The reason for
298 	 * this is that the fromlist can contain other FromExpr nodes again or
299 	 * JoinExpr nodes that also have quals. If that's the case we need to check
300 	 * those as well if they contain andy subqueries.
301 	 */
302 	return FindNodeMatchingCheckFunction((Node *) query->jointree, IsNodeSubquery);
303 }
304 
305 
306 /*
307  * TargetList returns true if the input query contains
308  * any subqueries in the WHERE clause.
309  */
310 bool
TargetListContainsSubquery(List * targetList)311 TargetListContainsSubquery(List *targetList)
312 {
313 	return FindNodeMatchingCheckFunction((Node *) targetList, IsNodeSubquery);
314 }
315 
316 
317 /*
318  * IsFunctionRTE determines whether the given node is a function RTE.
319  */
320 static bool
IsFunctionOrValuesRTE(Node * node)321 IsFunctionOrValuesRTE(Node *node)
322 {
323 	if (IsA(node, RangeTblEntry))
324 	{
325 		RangeTblEntry *rangeTblEntry = (RangeTblEntry *) node;
326 
327 		if (rangeTblEntry->rtekind == RTE_FUNCTION ||
328 			rangeTblEntry->rtekind == RTE_VALUES)
329 		{
330 			return true;
331 		}
332 	}
333 
334 	return false;
335 }
336 
337 
338 /*
339  * IsNodeSubquery returns true if the given node is a Query or SubPlan or a
340  * Param node with paramkind PARAM_EXEC.
341  *
342  * The check for SubPlan is needed when this is used on a already rewritten
343  * query. Such a query has SubPlan nodes instead of SubLink nodes (which
344  * contain a Query node).
345  * The check for PARAM_EXEC is needed because some very simple subqueries like
346  * (select 1) are converted to init plans in the rewritten query. In this case
347  * the only thing left in the query tree is a Param node with type PARAM_EXEC.
348  */
349 bool
IsNodeSubquery(Node * node)350 IsNodeSubquery(Node *node)
351 {
352 	if (node == NULL)
353 	{
354 		return false;
355 	}
356 
357 	if (IsA(node, Query) || IsA(node, SubPlan))
358 	{
359 		return true;
360 	}
361 
362 	if (!IsA(node, Param))
363 	{
364 		return false;
365 	}
366 	return ((Param *) node)->paramkind == PARAM_EXEC;
367 }
368 
369 
370 /*
371  * IsOuterJoinExpr returns whether the given node is an outer join expression.
372  */
373 static bool
IsOuterJoinExpr(Node * node)374 IsOuterJoinExpr(Node *node)
375 {
376 	bool isOuterJoin = false;
377 
378 	if (node == NULL)
379 	{
380 		return false;
381 	}
382 
383 	if (IsA(node, JoinExpr))
384 	{
385 		JoinExpr *joinExpr = (JoinExpr *) node;
386 		JoinType joinType = joinExpr->jointype;
387 		if (IS_OUTER_JOIN(joinType))
388 		{
389 			isOuterJoin = true;
390 		}
391 	}
392 
393 	return isOuterJoin;
394 }
395 
396 
397 /*
398  * SafeToPushdownWindowFunction checks if the query with window function is supported.
399  * Returns the result accordingly and modifies errorDetail if non null.
400  */
401 bool
SafeToPushdownWindowFunction(Query * query,StringInfo * errorDetail)402 SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail)
403 {
404 	ListCell *windowClauseCell = NULL;
405 	List *windowClauseList = query->windowClause;
406 
407 	/*
408 	 * We need to check each window clause separately if there is a partition by clause
409 	 * and if it is partitioned on the distribution column.
410 	 */
411 	foreach(windowClauseCell, windowClauseList)
412 	{
413 		WindowClause *windowClause = lfirst(windowClauseCell);
414 
415 		if (!windowClause->partitionClause)
416 		{
417 			if (errorDetail)
418 			{
419 				*errorDetail = makeStringInfo();
420 				appendStringInfoString(*errorDetail,
421 									   "Window functions without PARTITION BY on distribution "
422 									   "column is currently unsupported");
423 			}
424 			return false;
425 		}
426 	}
427 
428 	if (!WindowPartitionOnDistributionColumn(query))
429 	{
430 		if (errorDetail)
431 		{
432 			*errorDetail = makeStringInfo();
433 			appendStringInfoString(*errorDetail,
434 								   "Window functions with PARTITION BY list missing distribution "
435 								   "column is currently unsupported");
436 		}
437 		return false;
438 	}
439 
440 	return true;
441 }
442 
443 
444 /*
445  * WindowPartitionOnDistributionColumn checks if the given subquery has one
446  * or more window functions and at least one of them is not partitioned by
447  * distribution column. The function returns false if your window function does not
448  * have a partition by clause or it does not include the distribution column.
449  *
450  * Please note that if the query does not have a window function, the function
451  * returns true.
452  */
453 static bool
WindowPartitionOnDistributionColumn(Query * query)454 WindowPartitionOnDistributionColumn(Query *query)
455 {
456 	List *windowClauseList = query->windowClause;
457 	ListCell *windowClauseCell = NULL;
458 
459 	foreach(windowClauseCell, windowClauseList)
460 	{
461 		WindowClause *windowClause = lfirst(windowClauseCell);
462 		List *partitionClauseList = windowClause->partitionClause;
463 		List *targetEntryList = query->targetList;
464 
465 		List *groupTargetEntryList =
466 			GroupTargetEntryList(partitionClauseList, targetEntryList);
467 
468 		bool partitionOnDistributionColumn =
469 			TargetListOnPartitionColumn(query, groupTargetEntryList);
470 
471 		if (!partitionOnDistributionColumn)
472 		{
473 			return false;
474 		}
475 	}
476 
477 	return true;
478 }
479 
480 
481 /*
482  * SubqueryMultiNodeTree gets the query objects and returns logical plan
483  * for subqueries.
484  *
485  * We currently have two different code paths for creating logic plan for subqueries:
486  *   (i) subquery pushdown
487  *   (ii) single relation repartition subquery
488  *
489  * In order to create the logical plan, we follow the algorithm below:
490  *    -  If subquery pushdown planner can plan the query
491  *        -  We're done, we create the multi plan tree and return
492  *    -  Else
493  *       - If the query is not eligible for single table repartition subquery planning
494  *            - Throw the error that the subquery pushdown planner generated
495  *       - If it is eligible for single table repartition subquery planning
496  *            - Check for the errors for single table repartition subquery planning
497  *                - If no errors found, we're done. Create the multi plan and return
498  *                - If found errors, throw it
499  */
500 MultiNode *
SubqueryMultiNodeTree(Query * originalQuery,Query * queryTree,PlannerRestrictionContext * plannerRestrictionContext)501 SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree,
502 					  PlannerRestrictionContext *plannerRestrictionContext)
503 {
504 	MultiNode *multiQueryNode = NULL;
505 
506 	/*
507 	 * This is a generic error check that applies to both subquery pushdown
508 	 * and single table repartition subquery.
509 	 */
510 	DeferredErrorMessage *unsupportedQueryError = DeferErrorIfQueryNotSupported(
511 		originalQuery);
512 	if (unsupportedQueryError != NULL)
513 	{
514 		RaiseDeferredError(unsupportedQueryError, ERROR);
515 	}
516 
517 	/*
518 	 * In principle, we're first trying subquery pushdown planner. If it fails
519 	 * to create a logical plan, continue with trying the single table
520 	 * repartition subquery planning.
521 	 */
522 	DeferredErrorMessage *subqueryPushdownError = DeferErrorIfUnsupportedSubqueryPushdown(
523 		originalQuery,
524 		plannerRestrictionContext);
525 	if (!subqueryPushdownError)
526 	{
527 		multiQueryNode = SubqueryPushdownMultiNodeTree(originalQuery);
528 	}
529 	else if (subqueryPushdownError)
530 	{
531 		RaiseDeferredErrorInternal(subqueryPushdownError, ERROR);
532 
533 		List *subqueryEntryList = SubqueryEntryList(queryTree);
534 		RangeTblEntry *subqueryRangeTableEntry = (RangeTblEntry *) linitial(
535 			subqueryEntryList);
536 		Assert(subqueryRangeTableEntry->rtekind == RTE_SUBQUERY);
537 
538 		Query *subqueryTree = subqueryRangeTableEntry->subquery;
539 
540 		DeferredErrorMessage *repartitionQueryError =
541 			DeferErrorIfUnsupportedSubqueryRepartition(subqueryTree);
542 		if (repartitionQueryError)
543 		{
544 			RaiseDeferredErrorInternal(repartitionQueryError, ERROR);
545 		}
546 
547 		/* all checks have passed, safe to create the multi plan */
548 		multiQueryNode = MultiNodeTree(queryTree);
549 	}
550 
551 	Assert(multiQueryNode != NULL);
552 
553 	return multiQueryNode;
554 }
555 
556 
557 /*
558  * DeferErrorIfContainsUnsupportedSubqueryPushdown iterates on the query's subquery
559  * entry list and uses helper functions to check if we can push down subquery
560  * to worker nodes. These helper functions returns a deferred error if we
561  * cannot push down the subquery.
562  */
563 DeferredErrorMessage *
DeferErrorIfUnsupportedSubqueryPushdown(Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext)564 DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
565 										PlannerRestrictionContext *
566 										plannerRestrictionContext)
567 {
568 	bool outerMostQueryHasLimit = false;
569 	ListCell *subqueryCell = NULL;
570 	List *subqueryList = NIL;
571 
572 	if (originalQuery->limitCount != NULL)
573 	{
574 		outerMostQueryHasLimit = true;
575 	}
576 
577 	/*
578 	 * We're checking two things here:
579 	 *    (i)   If the query contains a top level union, ensure that all leaves
580 	 *          return the partition key at the same position
581 	 *    (ii)  Else, check whether all relations joined on the partition key or not
582 	 */
583 	if (ContainsUnionSubquery(originalQuery))
584 	{
585 		if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext))
586 		{
587 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
588 								 "cannot pushdown the subquery since not all subqueries "
589 								 "in the UNION have the partition column in the same "
590 								 "position",
591 								 "Each leaf query of the UNION should return the "
592 								 "partition column in the same position and all joins "
593 								 "must be on the partition column",
594 								 NULL);
595 		}
596 	}
597 	else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
598 	{
599 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
600 							 "complex joins are only supported when all distributed tables are "
601 							 "co-located and joined on their distribution columns",
602 							 NULL, NULL);
603 	}
604 
605 	/* we shouldn't allow reference tables in the FROM clause when the query has sublinks */
606 	DeferredErrorMessage *error = DeferErrorIfFromClauseRecurs(originalQuery);
607 	if (error)
608 	{
609 		return error;
610 	}
611 
612 	/* we shouldn't allow reference tables in the outer part of outer joins */
613 	error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext);
614 	if (error)
615 	{
616 		return error;
617 	}
618 
619 	/*
620 	 * We first extract all the queries that appear in the original query. Later,
621 	 * we delete the original query given that error rules does not apply to the
622 	 * top level query. For instance, we could support any LIMIT/ORDER BY on the
623 	 * top level query.
624 	 */
625 	ExtractQueryWalker((Node *) originalQuery, &subqueryList);
626 	subqueryList = list_delete(subqueryList, originalQuery);
627 
628 	/* iterate on the subquery list and error out accordingly */
629 	foreach(subqueryCell, subqueryList)
630 	{
631 		Query *subquery = lfirst(subqueryCell);
632 		error = DeferErrorIfCannotPushdownSubquery(subquery,
633 												   outerMostQueryHasLimit);
634 		if (error)
635 		{
636 			return error;
637 		}
638 	}
639 
640 	return NULL;
641 }
642 
643 
644 /*
645  * DeferErrorIfFromClauseRecurs returns a deferred error if the
646  * given query is not suitable for subquery pushdown.
647  *
648  * While planning sublinks, we rely on Postgres in the sense that it converts some of
649  * sublinks into joins.
650  *
651  * In some cases, sublinks are pulled up and converted into outer joins. Those cases
652  * are already handled with DeferredErrorIfUnsupportedRecurringTuplesJoin().
653  *
654  * If the sublinks are not pulled up, we should still error out in if the expression
655  * in the FROM clause would recur for every shard in a subquery on the WHERE clause.
656  *
657  * Otherwise, the result would include duplicate rows.
658  */
659 static DeferredErrorMessage *
DeferErrorIfFromClauseRecurs(Query * queryTree)660 DeferErrorIfFromClauseRecurs(Query *queryTree)
661 {
662 	if (!queryTree->hasSubLinks)
663 	{
664 		return NULL;
665 	}
666 
667 	RecurringTuplesType recurType = FromClauseRecurringTupleType(queryTree);
668 	if (recurType == RECURRING_TUPLES_REFERENCE_TABLE)
669 	{
670 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
671 							 "correlated subqueries are not supported when "
672 							 "the FROM clause contains a reference table", NULL, NULL);
673 	}
674 	else if (recurType == RECURRING_TUPLES_FUNCTION)
675 	{
676 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
677 							 "correlated subqueries are not supported when "
678 							 "the FROM clause contains a set returning function", NULL,
679 							 NULL);
680 	}
681 	else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
682 	{
683 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
684 							 "correlated subqueries are not supported when "
685 							 "the FROM clause contains a CTE or subquery", NULL, NULL);
686 	}
687 	else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE)
688 	{
689 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
690 							 "correlated subqueries are not supported when "
691 							 "the FROM clause contains a subquery without FROM", NULL,
692 							 NULL);
693 	}
694 	else if (recurType == RECURRING_TUPLES_VALUES)
695 	{
696 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
697 							 "correlated subqueries are not supported when "
698 							 "the FROM clause contains VALUES", NULL,
699 							 NULL);
700 	}
701 
702 
703 	/*
704 	 * We get here when there is neither a distributed table, nor recurring tuples.
705 	 * That usually means that there isn't a FROM at all (only sublinks), this
706 	 * implies that queryTree is recurring, but whether this is a problem depends
707 	 * on outer queries, not on queryTree itself.
708 	 */
709 
710 	return NULL;
711 }
712 
713 
714 /*
715  * FromClauseRecurringTupleType returns tuple recurrence information
716  * in query result based on range table entries in from clause.
717  *
718  * Returned information is used to prepare appropriate deferred error
719  * message for subquery pushdown checks.
720  */
721 static RecurringTuplesType
FromClauseRecurringTupleType(Query * queryTree)722 FromClauseRecurringTupleType(Query *queryTree)
723 {
724 	RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
725 
726 	if (HasEmptyJoinTree(queryTree))
727 	{
728 		return RECURRING_TUPLES_EMPTY_JOIN_TREE;
729 	}
730 
731 	if (FindNodeMatchingCheckFunctionInRangeTableList(queryTree->rtable,
732 													  IsDistributedTableRTE))
733 	{
734 		/*
735 		 * There is a distributed table somewhere in the FROM clause.
736 		 *
737 		 * In the typical case this means that the query does not recur,
738 		 * but there are two exceptions:
739 		 *
740 		 * - outer joins such as reference_table LEFT JOIN distributed_table
741 		 * - FROM reference_table WHERE .. (SELECT .. FROM distributed_table) ..
742 		 *
743 		 * However, we check all subqueries and joins separately, so we would
744 		 * find such conditions in other calls.
745 		 */
746 		return RECURRING_TUPLES_INVALID;
747 	}
748 
749 	/*
750 	 * Try to figure out which type of recurring tuples we have to produce a
751 	 * relevant error message. If there are several we'll pick the first one.
752 	 */
753 	ContainsRecurringRangeTable(queryTree->rtable, &recurType);
754 
755 	return recurType;
756 }
757 
758 
759 /*
760  * DeferredErrorIfUnsupportedRecurringTuplesJoin returns true if there exists a outer join
761  * between reference table and distributed tables which does not follow
762  * the rules :
763  * - Reference tables can not be located in the outer part of the semi join or the
764  * anti join. Otherwise, we may have duplicate results. Although getting duplicate
765  * results is not possible by checking the equality on the column of the reference
766  * table and partition column of distributed table, we still keep these checks.
767  * Because, using the reference table in the outer part of the semi join or anti
768  * join is not very common.
769  * - Reference tables can not be located in the outer part of the left join
770  * (Note that PostgreSQL converts right joins to left joins. While converting
771  * join types, innerrel and outerrel are also switched.) Otherwise we will
772  * definitely have duplicate rows. Beside, reference tables can not be used
773  * with full outer joins because of the same reason.
774  */
775 static DeferredErrorMessage *
DeferredErrorIfUnsupportedRecurringTuplesJoin(PlannerRestrictionContext * plannerRestrictionContext)776 DeferredErrorIfUnsupportedRecurringTuplesJoin(
777 	PlannerRestrictionContext *plannerRestrictionContext)
778 {
779 	List *joinRestrictionList =
780 		plannerRestrictionContext->joinRestrictionContext->joinRestrictionList;
781 	ListCell *joinRestrictionCell = NULL;
782 	RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
783 	foreach(joinRestrictionCell, joinRestrictionList)
784 	{
785 		JoinRestriction *joinRestriction = (JoinRestriction *) lfirst(
786 			joinRestrictionCell);
787 		JoinType joinType = joinRestriction->joinType;
788 		PlannerInfo *plannerInfo = joinRestriction->plannerInfo;
789 		Relids innerrelRelids = joinRestriction->innerrelRelids;
790 		Relids outerrelRelids = joinRestriction->outerrelRelids;
791 
792 		if (joinType == JOIN_SEMI || joinType == JOIN_ANTI || joinType == JOIN_LEFT)
793 		{
794 			/*
795 			 * If there are only recurring tuples on the inner side of a join then
796 			 * we can push it down, regardless of whether the outer side is
797 			 * recurring or not. Otherwise, we check the outer side for recurring
798 			 * tuples.
799 			 */
800 			if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids))
801 			{
802 				continue;
803 			}
804 
805 
806 			/*
807 			 * If the outer side of the join doesn't have any distributed tables
808 			 * (e.g., contains only recurring tuples), Citus should not pushdown
809 			 * the query. The reason is that recurring tuples on every shard would
810 			 * be added to the result, which is wrong.
811 			 */
812 			if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids))
813 			{
814 				/*
815 				 * Find the first (or only) recurring RTE to give a meaningful
816 				 * error to the user.
817 				 */
818 				recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
819 
820 				break;
821 			}
822 		}
823 		else if (joinType == JOIN_FULL)
824 		{
825 			if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids))
826 			{
827 				/*
828 				 * Find the first (or only) recurring RTE to give a meaningful
829 				 * error to the user.
830 				 */
831 				recurType = FetchFirstRecurType(plannerInfo, innerrelRelids);
832 
833 				break;
834 			}
835 
836 			if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids))
837 			{
838 				/*
839 				 * Find the first (or only) recurring RTE to give a meaningful
840 				 * error to the user.
841 				 */
842 				recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
843 
844 				break;
845 			}
846 		}
847 	}
848 
849 	if (recurType == RECURRING_TUPLES_REFERENCE_TABLE)
850 	{
851 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
852 							 "cannot pushdown the subquery",
853 							 "There exist a reference table in the outer "
854 							 "part of the outer join", NULL);
855 	}
856 	else if (recurType == RECURRING_TUPLES_FUNCTION)
857 	{
858 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
859 							 "cannot pushdown the subquery",
860 							 "There exist a table function in the outer "
861 							 "part of the outer join", NULL);
862 	}
863 	else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE)
864 	{
865 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
866 							 "cannot pushdown the subquery",
867 							 "There exist a subquery without FROM in the outer "
868 							 "part of the outer join", NULL);
869 	}
870 	else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
871 	{
872 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
873 							 "cannot pushdown the subquery",
874 							 "Complex subqueries, CTEs and local tables cannot be in "
875 							 "the outer part of an outer join with a distributed table",
876 							 NULL);
877 	}
878 	else if (recurType == RECURRING_TUPLES_VALUES)
879 	{
880 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
881 							 "cannot pushdown the subquery",
882 							 "There exist a VALUES clause in the outer "
883 							 "part of the outer join", NULL);
884 	}
885 
886 	return NULL;
887 }
888 
889 
890 /*
891  * CanPushdownSubquery checks if we can push down the given
892  * subquery to worker nodes.
893  */
894 bool
CanPushdownSubquery(Query * subqueryTree,bool outerMostQueryHasLimit)895 CanPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit)
896 {
897 	return DeferErrorIfCannotPushdownSubquery(subqueryTree, outerMostQueryHasLimit) ==
898 		   NULL;
899 }
900 
901 
902 /*
903  * DeferErrorIfCannotPushdownSubquery checks if we can push down the given
904  * subquery to worker nodes. If we cannot push down the subquery, this function
905  * returns a deferred error.
906  *
907  * We can push down a subquery if it follows rules below:
908  * a. If there is an aggregate, it must be grouped on partition column.
909  * b. If there is a join, it must be between two regular tables or two subqueries.
910  * We don't support join between a regular table and a subquery. And columns on
911  * the join condition must be partition columns.
912  * c. If there is a distinct clause, it must be on the partition column.
913  *
914  * This function is very similar to DeferErrorIfQueryNotSupported() in logical
915  * planner, but we don't reuse it, because differently for subqueries we support
916  * a subset of distinct, union and left joins.
917  *
918  * Note that this list of checks is not exhaustive, there can be some cases
919  * which we let subquery to run but returned results would be wrong. Such as if
920  * a subquery has a group by on another subquery which includes order by with
921  * limit, we let this query to run, but results could be wrong depending on the
922  * features of underlying tables.
923  */
924 DeferredErrorMessage *
DeferErrorIfCannotPushdownSubquery(Query * subqueryTree,bool outerMostQueryHasLimit)925 DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit)
926 {
927 	bool preconditionsSatisfied = true;
928 	char *errorDetail = NULL;
929 
930 	DeferredErrorMessage *deferredError = DeferErrorIfUnsupportedTableCombination(
931 		subqueryTree);
932 	if (deferredError)
933 	{
934 		return deferredError;
935 	}
936 
937 	if (HasEmptyJoinTree(subqueryTree) &&
938 		contain_mutable_functions((Node *) subqueryTree->targetList))
939 	{
940 		preconditionsSatisfied = false;
941 		errorDetail = "Subqueries without a FROM clause can only contain immutable "
942 					  "functions";
943 	}
944 
945 	/*
946 	 * Correlated subqueries are effectively functions that are repeatedly called
947 	 * for the values of the vars that point to the outer query. We can liberally
948 	 * push down SQL features within such a function, as long as co-located join
949 	 * checks are applied.
950 	 */
951 	if (!ContainsReferencesToOuterQuery(subqueryTree))
952 	{
953 		deferredError = DeferErrorIfSubqueryRequiresMerge(subqueryTree);
954 		if (deferredError)
955 		{
956 			return deferredError;
957 		}
958 	}
959 
960 	/*
961 	 * Limit is partially supported when SubqueryPushdown is set.
962 	 * The outermost query must have a limit clause.
963 	 */
964 	if (subqueryTree->limitCount && SubqueryPushdown && !outerMostQueryHasLimit)
965 	{
966 		preconditionsSatisfied = false;
967 		errorDetail = "Limit in subquery without limit in the outermost query is "
968 					  "unsupported";
969 	}
970 
971 	if (subqueryTree->setOperations)
972 	{
973 		deferredError = DeferErrorIfUnsupportedUnionQuery(subqueryTree);
974 		if (deferredError)
975 		{
976 			return deferredError;
977 		}
978 	}
979 
980 	if (subqueryTree->hasRecursive)
981 	{
982 		preconditionsSatisfied = false;
983 		errorDetail = "Recursive queries are currently unsupported";
984 	}
985 
986 	if (subqueryTree->cteList)
987 	{
988 		preconditionsSatisfied = false;
989 		errorDetail = "Common Table Expressions are currently unsupported";
990 	}
991 
992 	if (subqueryTree->hasForUpdate)
993 	{
994 		preconditionsSatisfied = false;
995 		errorDetail = "For Update/Share commands are currently unsupported";
996 	}
997 
998 	/* grouping sets are not allowed in subqueries*/
999 	if (subqueryTree->groupingSets)
1000 	{
1001 		preconditionsSatisfied = false;
1002 		errorDetail = "could not run distributed query with GROUPING SETS, CUBE, "
1003 					  "or ROLLUP";
1004 	}
1005 
1006 	deferredError = DeferErrorIfFromClauseRecurs(subqueryTree);
1007 	if (deferredError)
1008 	{
1009 		return deferredError;
1010 	}
1011 
1012 
1013 	/* finally check and return deferred if not satisfied */
1014 	if (!preconditionsSatisfied)
1015 	{
1016 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1017 							 "cannot push down this subquery",
1018 							 errorDetail, NULL);
1019 	}
1020 
1021 	return NULL;
1022 }
1023 
1024 
1025 /*
1026  * DeferErrorIfSubqueryRequiresMerge returns a deferred error if the subquery
1027  * requires a merge step on the coordinator (e.g. limit, group by non-distribution
1028  * column, etc.).
1029  */
1030 static DeferredErrorMessage *
DeferErrorIfSubqueryRequiresMerge(Query * subqueryTree)1031 DeferErrorIfSubqueryRequiresMerge(Query *subqueryTree)
1032 {
1033 	bool preconditionsSatisfied = true;
1034 	char *errorDetail = NULL;
1035 
1036 	if (subqueryTree->limitOffset)
1037 	{
1038 		preconditionsSatisfied = false;
1039 		errorDetail = "Offset clause is currently unsupported when a subquery "
1040 					  "references a column from another query";
1041 	}
1042 
1043 	/* limit is not supported when SubqueryPushdown is not set */
1044 	if (subqueryTree->limitCount && !SubqueryPushdown)
1045 	{
1046 		preconditionsSatisfied = false;
1047 		errorDetail = "Limit in subquery is currently unsupported when a "
1048 					  "subquery references a column from another query";
1049 	}
1050 
1051 	/* group clause list must include partition column */
1052 	if (subqueryTree->groupClause)
1053 	{
1054 		List *groupClauseList = subqueryTree->groupClause;
1055 		List *targetEntryList = subqueryTree->targetList;
1056 		List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
1057 														  targetEntryList);
1058 		bool groupOnPartitionColumn =
1059 			TargetListOnPartitionColumn(subqueryTree, groupTargetEntryList);
1060 		if (!groupOnPartitionColumn)
1061 		{
1062 			preconditionsSatisfied = false;
1063 			errorDetail = "Group by list without partition column is currently "
1064 						  "unsupported when a subquery references a column "
1065 						  "from another query";
1066 		}
1067 	}
1068 
1069 	/* we don't support aggregates without group by */
1070 	if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL))
1071 	{
1072 		preconditionsSatisfied = false;
1073 		errorDetail = "Aggregates without group by are currently unsupported "
1074 					  "when a subquery references a column from another query";
1075 	}
1076 
1077 	/* having clause without group by on partition column is not supported */
1078 	if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL))
1079 	{
1080 		preconditionsSatisfied = false;
1081 		errorDetail = "Having qual without group by on partition column is "
1082 					  "currently unsupported when a subquery references "
1083 					  "a column from another query";
1084 	}
1085 
1086 	/*
1087 	 * We support window functions when the window function
1088 	 * is partitioned on distribution column.
1089 	 */
1090 	StringInfo errorInfo = NULL;
1091 	if (subqueryTree->hasWindowFuncs && !SafeToPushdownWindowFunction(subqueryTree,
1092 																	  &errorInfo))
1093 	{
1094 		errorDetail = (char *) errorInfo->data;
1095 		preconditionsSatisfied = false;
1096 	}
1097 
1098 	/* distinct clause list must include partition column */
1099 	if (subqueryTree->distinctClause)
1100 	{
1101 		List *distinctClauseList = subqueryTree->distinctClause;
1102 		List *targetEntryList = subqueryTree->targetList;
1103 		List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList,
1104 															 targetEntryList);
1105 		bool distinctOnPartitionColumn =
1106 			TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList);
1107 		if (!distinctOnPartitionColumn)
1108 		{
1109 			preconditionsSatisfied = false;
1110 			errorDetail = "Distinct on columns without partition column is "
1111 						  "currently unsupported";
1112 		}
1113 	}
1114 
1115 	/* finally check and return deferred if not satisfied */
1116 	if (!preconditionsSatisfied)
1117 	{
1118 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1119 							 "cannot push down this subquery",
1120 							 errorDetail, NULL);
1121 	}
1122 
1123 	return NULL;
1124 }
1125 
1126 
1127 /*
1128  * DeferErrorIfUnsupportedTableCombination checks if the given query tree contains any
1129  * unsupported range table combinations. For this, the function walks over all
1130  * range tables in the join tree, and checks if they correspond to simple relations
1131  * or subqueries. It also checks if there is a join between a regular table and
1132  * a subquery and if join is on more than two range table entries. If any error is found,
1133  * a deferred error is returned. Else, NULL is returned.
1134  */
1135 static DeferredErrorMessage *
DeferErrorIfUnsupportedTableCombination(Query * queryTree)1136 DeferErrorIfUnsupportedTableCombination(Query *queryTree)
1137 {
1138 	List *rangeTableList = queryTree->rtable;
1139 	List *joinTreeTableIndexList = NIL;
1140 	int joinTreeTableIndex = 0;
1141 	bool unsupportedTableCombination = false;
1142 	char *errorDetail = NULL;
1143 
1144 	/*
1145 	 * Extract all range table indexes from the join tree. Note that sub-queries
1146 	 * that get pulled up by PostgreSQL don't appear in this join tree.
1147 	 */
1148 	ExtractRangeTableIndexWalker((Node *) queryTree->jointree,
1149 								 &joinTreeTableIndexList);
1150 
1151 	foreach_int(joinTreeTableIndex, joinTreeTableIndexList)
1152 	{
1153 		/*
1154 		 * Join tree's range table index starts from 1 in the query tree. But,
1155 		 * list indexes start from 0.
1156 		 */
1157 		int rangeTableListIndex = joinTreeTableIndex - 1;
1158 
1159 		RangeTblEntry *rangeTableEntry =
1160 			(RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex);
1161 
1162 		/*
1163 		 * Check if the range table in the join tree is a simple relation, a
1164 		 * subquery, or immutable function.
1165 		 */
1166 		if (rangeTableEntry->rtekind == RTE_RELATION ||
1167 			rangeTableEntry->rtekind == RTE_SUBQUERY ||
1168 			rangeTableEntry->rtekind == RTE_RESULT)
1169 		{
1170 			/* accepted */
1171 		}
1172 		else if (rangeTableEntry->rtekind == RTE_VALUES)
1173 		{
1174 			/*
1175 			 * When GUC is set to -1, we disable materialization, when set to 0,
1176 			 * we materialize everything. Other values are compared against the
1177 			 * length of the values_lists.
1178 			 */
1179 			int valuesRowCount = list_length(rangeTableEntry->values_lists);
1180 			if (ValuesMaterializationThreshold >= 0 &&
1181 				valuesRowCount > ValuesMaterializationThreshold)
1182 			{
1183 				unsupportedTableCombination = true;
1184 				errorDetail = "VALUES has more than "
1185 							  "\"citus.values_materialization_threshold\" "
1186 							  "entries, so it is materialized";
1187 			}
1188 			else if (contain_mutable_functions((Node *) rangeTableEntry->values_lists))
1189 			{
1190 				/* VALUES should not contain mutable functions */
1191 				unsupportedTableCombination = true;
1192 				errorDetail = "Only immutable functions can be used in VALUES";
1193 			}
1194 		}
1195 		else if (rangeTableEntry->rtekind == RTE_FUNCTION)
1196 		{
1197 			List *functionList = rangeTableEntry->functions;
1198 
1199 			if (list_length(functionList) == 1 &&
1200 				ContainsReadIntermediateResultFunction(linitial(functionList)))
1201 			{
1202 				/*
1203 				 * The read_intermediate_result function is volatile, but we know
1204 				 * it has the same result across all nodes and can therefore treat
1205 				 * it as a reference table.
1206 				 */
1207 			}
1208 			else if (contain_mutable_functions((Node *) functionList))
1209 			{
1210 				unsupportedTableCombination = true;
1211 				errorDetail = "Only immutable functions can be used as a table "
1212 							  "expressions in a multi-shard query";
1213 			}
1214 			else
1215 			{
1216 				/* immutable function RTEs are treated as reference tables */
1217 			}
1218 		}
1219 		else if (rangeTableEntry->rtekind == RTE_CTE)
1220 		{
1221 			unsupportedTableCombination = true;
1222 			errorDetail = "CTEs in subqueries are currently unsupported";
1223 			break;
1224 		}
1225 		else
1226 		{
1227 			unsupportedTableCombination = true;
1228 			errorDetail = "Table expressions other than relations, subqueries, "
1229 						  "and immutable functions are currently unsupported";
1230 			break;
1231 		}
1232 	}
1233 
1234 	/* finally check and error out if not satisfied */
1235 	if (unsupportedTableCombination)
1236 	{
1237 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1238 							 "cannot push down this subquery",
1239 							 errorDetail, NULL);
1240 	}
1241 
1242 	return NULL;
1243 }
1244 
1245 
1246 /*
1247  * DeferErrorIfUnsupportedUnionQuery is a helper function for ErrorIfCannotPushdownSubquery().
1248  * The function also errors out for set operations INTERSECT and EXCEPT.
1249  */
1250 DeferredErrorMessage *
DeferErrorIfUnsupportedUnionQuery(Query * subqueryTree)1251 DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree)
1252 {
1253 	List *setOperationStatementList = NIL;
1254 	ListCell *setOperationStatmentCell = NULL;
1255 	RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
1256 
1257 	ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations,
1258 									  &setOperationStatementList);
1259 	foreach(setOperationStatmentCell, setOperationStatementList)
1260 	{
1261 		SetOperationStmt *setOperation =
1262 			(SetOperationStmt *) lfirst(setOperationStatmentCell);
1263 		Node *leftArg = setOperation->larg;
1264 		Node *rightArg = setOperation->rarg;
1265 		int leftArgRTI = 0;
1266 		int rightArgRTI = 0;
1267 
1268 		if (setOperation->op != SETOP_UNION)
1269 		{
1270 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1271 								 "cannot push down this subquery",
1272 								 "Intersect and Except are currently unsupported",
1273 								 NULL);
1274 		}
1275 
1276 		if (IsA(leftArg, RangeTblRef))
1277 		{
1278 			leftArgRTI = ((RangeTblRef *) leftArg)->rtindex;
1279 			Query *leftArgSubquery = rt_fetch(leftArgRTI,
1280 											  subqueryTree->rtable)->subquery;
1281 			recurType = FromClauseRecurringTupleType(leftArgSubquery);
1282 			if (recurType != RECURRING_TUPLES_INVALID)
1283 			{
1284 				break;
1285 			}
1286 		}
1287 
1288 		if (IsA(rightArg, RangeTblRef))
1289 		{
1290 			rightArgRTI = ((RangeTblRef *) rightArg)->rtindex;
1291 			Query *rightArgSubquery = rt_fetch(rightArgRTI,
1292 											   subqueryTree->rtable)->subquery;
1293 			recurType = FromClauseRecurringTupleType(rightArgSubquery);
1294 			if (recurType != RECURRING_TUPLES_INVALID)
1295 			{
1296 				break;
1297 			}
1298 		}
1299 	}
1300 
1301 	if (recurType == RECURRING_TUPLES_REFERENCE_TABLE)
1302 	{
1303 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1304 							 "cannot push down this subquery",
1305 							 "Reference tables are not supported with union operator",
1306 							 NULL);
1307 	}
1308 	else if (recurType == RECURRING_TUPLES_FUNCTION)
1309 	{
1310 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1311 							 "cannot push down this subquery",
1312 							 "Table functions are not supported with union operator",
1313 							 NULL);
1314 	}
1315 	else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE)
1316 	{
1317 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1318 							 "cannot push down this subquery",
1319 							 "Subqueries without a FROM clause are not supported with "
1320 							 "union operator", NULL);
1321 	}
1322 	else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
1323 	{
1324 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1325 							 "cannot push down this subquery",
1326 							 "Complex subqueries and CTEs are not supported within a "
1327 							 "UNION", NULL);
1328 	}
1329 	else if (recurType == RECURRING_TUPLES_VALUES)
1330 	{
1331 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1332 							 "cannot push down this subquery",
1333 							 "VALUES is not supported within a "
1334 							 "UNION", NULL);
1335 	}
1336 
1337 	return NULL;
1338 }
1339 
1340 
1341 /*
1342  * ExtractSetOperationStatementWalker walks over a set operations statment,
1343  * and finds all set operations in the tree.
1344  */
1345 static bool
ExtractSetOperationStatmentWalker(Node * node,List ** setOperationList)1346 ExtractSetOperationStatmentWalker(Node *node, List **setOperationList)
1347 {
1348 	if (node == NULL)
1349 	{
1350 		return false;
1351 	}
1352 
1353 	if (IsA(node, SetOperationStmt))
1354 	{
1355 		SetOperationStmt *setOperation = (SetOperationStmt *) node;
1356 
1357 		(*setOperationList) = lappend(*setOperationList, setOperation);
1358 	}
1359 
1360 	bool walkerResult = expression_tree_walker(node,
1361 											   ExtractSetOperationStatmentWalker,
1362 											   setOperationList);
1363 
1364 	return walkerResult;
1365 }
1366 
1367 
1368 /*
1369  * RelationInfoContainsOnlyRecurringTuples returns false if any of the relations in
1370  * a RelOptInfo is not recurring.
1371  */
1372 static bool
RelationInfoContainsOnlyRecurringTuples(PlannerInfo * plannerInfo,Relids relids)1373 RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, Relids relids)
1374 {
1375 	int relationId = -1;
1376 
1377 	while ((relationId = bms_next_member(relids, relationId)) >= 0)
1378 	{
1379 		RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId];
1380 
1381 		if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry),
1382 														  IsDistributedTableRTE))
1383 		{
1384 			/* we already found a distributed table, no need to check further */
1385 			return false;
1386 		}
1387 
1388 		/*
1389 		 * If there are no distributed tables, there should be at least
1390 		 * one recurring rte.
1391 		 */
1392 		RecurringTuplesType recurType PG_USED_FOR_ASSERTS_ONLY;
1393 		Assert(ContainsRecurringRTE(rangeTableEntry, &recurType));
1394 	}
1395 
1396 	return true;
1397 }
1398 
1399 
1400 /*
1401  * FetchFirstRecurType checks whether the relationInfo
1402  * contains any recurring table expression, namely a reference table,
1403  * or immutable function. If found, FetchFirstRecurType
1404  * returns true.
1405  *
1406  * Note that since relation ids of relationInfo indexes to the range
1407  * table entry list of planner info, planner info is also passed.
1408  */
1409 static RecurringTuplesType
FetchFirstRecurType(PlannerInfo * plannerInfo,Relids relids)1410 FetchFirstRecurType(PlannerInfo *plannerInfo, Relids relids)
1411 {
1412 	RecurringTuplesType recurType = RECURRING_TUPLES_INVALID;
1413 	int relationId = -1;
1414 
1415 	while ((relationId = bms_next_member(relids, relationId)) >= 0)
1416 	{
1417 		RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId];
1418 
1419 		/* relationInfo has this range table entry */
1420 		if (ContainsRecurringRTE(rangeTableEntry, &recurType))
1421 		{
1422 			return recurType;
1423 		}
1424 	}
1425 
1426 	return recurType;
1427 }
1428 
1429 
1430 /*
1431  * ContainsRecurringRTE returns whether the range table entry contains
1432  * any entry that generates the same set of tuples when repeating it in
1433  * a query on different shards.
1434  */
1435 static bool
ContainsRecurringRTE(RangeTblEntry * rangeTableEntry,RecurringTuplesType * recurType)1436 ContainsRecurringRTE(RangeTblEntry *rangeTableEntry, RecurringTuplesType *recurType)
1437 {
1438 	return ContainsRecurringRangeTable(list_make1(rangeTableEntry), recurType);
1439 }
1440 
1441 
1442 /*
1443  * ContainsRecurringRangeTable returns whether the range table list contains
1444  * any entry that generates the same set of tuples when repeating it in
1445  * a query on different shards.
1446  */
1447 static bool
ContainsRecurringRangeTable(List * rangeTable,RecurringTuplesType * recurType)1448 ContainsRecurringRangeTable(List *rangeTable, RecurringTuplesType *recurType)
1449 {
1450 	return range_table_walker(rangeTable, HasRecurringTuples, recurType,
1451 							  QTW_EXAMINE_RTES_BEFORE);
1452 }
1453 
1454 
1455 /*
1456  * HasRecurringTuples returns whether any part of the expression will generate
1457  * the same set of tuples in every query on shards when executing a distributed
1458  * query.
1459  */
1460 static bool
HasRecurringTuples(Node * node,RecurringTuplesType * recurType)1461 HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
1462 {
1463 	if (node == NULL)
1464 	{
1465 		return false;
1466 	}
1467 
1468 	if (IsA(node, RangeTblEntry))
1469 	{
1470 		RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
1471 
1472 		if (rangeTableEntry->rtekind == RTE_RELATION)
1473 		{
1474 			Oid relationId = rangeTableEntry->relid;
1475 			if (IsCitusTableType(relationId, REFERENCE_TABLE))
1476 			{
1477 				*recurType = RECURRING_TUPLES_REFERENCE_TABLE;
1478 
1479 				/*
1480 				 * Tuples from reference tables will recur in every query on shards
1481 				 * that includes it.
1482 				 */
1483 				return true;
1484 			}
1485 		}
1486 		else if (rangeTableEntry->rtekind == RTE_FUNCTION)
1487 		{
1488 			List *functionList = rangeTableEntry->functions;
1489 
1490 			if (list_length(functionList) == 1 &&
1491 				ContainsReadIntermediateResultFunction((Node *) functionList))
1492 			{
1493 				*recurType = RECURRING_TUPLES_RESULT_FUNCTION;
1494 			}
1495 			else
1496 			{
1497 				*recurType = RECURRING_TUPLES_FUNCTION;
1498 			}
1499 
1500 			/*
1501 			 * Tuples from functions will recur in every query on shards that includes
1502 			 * it.
1503 			 */
1504 			return true;
1505 		}
1506 		else if (rangeTableEntry->rtekind == RTE_RESULT)
1507 		{
1508 			*recurType = RECURRING_TUPLES_EMPTY_JOIN_TREE;
1509 			return true;
1510 		}
1511 		else if (rangeTableEntry->rtekind == RTE_VALUES)
1512 		{
1513 			*recurType = RECURRING_TUPLES_VALUES;
1514 			return true;
1515 		}
1516 
1517 		return false;
1518 	}
1519 	else if (IsA(node, Query))
1520 	{
1521 		Query *query = (Query *) node;
1522 
1523 		if (HasEmptyJoinTree(query))
1524 		{
1525 			*recurType = RECURRING_TUPLES_EMPTY_JOIN_TREE;
1526 
1527 			/*
1528 			 * Queries with empty join trees will recur in every query on shards
1529 			 * that includes it.
1530 			 */
1531 			return true;
1532 		}
1533 
1534 		return query_tree_walker((Query *) node, HasRecurringTuples,
1535 								 recurType, QTW_EXAMINE_RTES_BEFORE);
1536 	}
1537 
1538 	return expression_tree_walker(node, HasRecurringTuples, recurType);
1539 }
1540 
1541 
1542 /*
1543  * SubqueryPushdownMultiNodeTree creates logical plan for subquery pushdown logic.
1544  * Note that this logic will be changed in next iterations, so we decoupled it
1545  * from other parts of code although it causes some code duplication.
1546  *
1547  * Current subquery pushdown support in MultiTree logic requires a single range
1548  * table entry in the top most from clause. Therefore we inject a synthetic
1549  * query derived from the top level query and make it the only range table
1550  * entry for the top level query. This way we can push down any subquery joins
1551  * down to workers without invoking join order planner.
1552  */
1553 static MultiNode *
SubqueryPushdownMultiNodeTree(Query * originalQuery)1554 SubqueryPushdownMultiNodeTree(Query *originalQuery)
1555 {
1556 	Query *queryTree = copyObject(originalQuery);
1557 	List *targetEntryList = queryTree->targetList;
1558 	MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect);
1559 
1560 	/* verify we can perform distributed planning on this query */
1561 	DeferredErrorMessage *unsupportedQueryError = DeferErrorIfQueryNotSupported(
1562 		queryTree);
1563 	if (unsupportedQueryError != NULL)
1564 	{
1565 		RaiseDeferredError(unsupportedQueryError, ERROR);
1566 	}
1567 
1568 	/*
1569 	 * We would be creating a new Query and pushing down top level query's
1570 	 * contents down to it. Join and filter clauses in higher level query would
1571 	 * be transferred to lower query. Therefore after this function we would
1572 	 * only have a single range table entry in the top level query. We need to
1573 	 * create a target list entry in lower query for each column reference in
1574 	 * upper level query's target list and having clauses. Any column reference
1575 	 * in the upper query will be updated to have varno=1, and varattno=<resno>
1576 	 * of matching target entry in pushed down query.
1577 	 * Consider query
1578 	 *      SELECT s1.a, sum(s2.c)
1579 	 *      FROM (some subquery) s1, (some subquery) s2
1580 	 *      WHERE s1.a = s2.a
1581 	 *      GROUP BY s1.a
1582 	 *      HAVING avg(s2.b);
1583 	 *
1584 	 * We want to prepare a multi tree to avoid subquery joins at top level,
1585 	 * therefore above query is converted to an equivalent
1586 	 *      SELECT worker_column_0, sum(worker_column_1)
1587 	 *      FROM (
1588 	 *              SELECT
1589 	 *                  s1.a AS worker_column_0,
1590 	 *                  s2.c AS worker_column_1,
1591 	 *                  s2.b AS worker_column_2
1592 	 *              FROM (some subquery) s1, (some subquery) s2
1593 	 *              WHERE s1.a = s2.a) worker_subquery
1594 	 *      GROUP BY worker_column_0
1595 	 *      HAVING avg(worker_column_2);
1596 	 *  After this conversion MultiTree is created as follows
1597 	 *
1598 	 *  MultiExtendedOpNode(
1599 	 *      targetList : worker_column_0, sum(worker_column_1)
1600 	 *      groupBy : worker_column_0
1601 	 *      having :  avg(worker_column_2))
1602 	 * --->MultiProject (worker_column_0, worker_column_1, worker_column_2)
1603 	 * --->--->	MultiTable (subquery : worker_subquery)
1604 	 *
1605 	 * Master and worker queries will be created out of this MultiTree at later stages.
1606 	 */
1607 
1608 	/*
1609 	 * columnList contains all columns returned by subquery. Subquery target
1610 	 * entry list, subquery range table entry's column name list are derived from
1611 	 * columnList. Columns mentioned in multiProject node and multiExtendedOp
1612 	 * node are indexed with their respective position in columnList.
1613 	 */
1614 	List *targetColumnList = pull_vars_of_level((Node *) targetEntryList, 0);
1615 	List *havingClauseColumnList = pull_var_clause_default(queryTree->havingQual);
1616 	List *columnList = list_concat(targetColumnList, havingClauseColumnList);
1617 
1618 	/* create a target entry for each unique column */
1619 	List *subqueryTargetEntryList = CreateSubqueryTargetListAndAdjustVars(columnList);
1620 
1621 	/* new query only has target entries, join tree, and rtable*/
1622 	Query *pushedDownQuery = makeNode(Query);
1623 	pushedDownQuery->commandType = queryTree->commandType;
1624 	pushedDownQuery->targetList = subqueryTargetEntryList;
1625 	pushedDownQuery->jointree = copyObject(queryTree->jointree);
1626 	pushedDownQuery->rtable = copyObject(queryTree->rtable);
1627 	pushedDownQuery->setOperations = copyObject(queryTree->setOperations);
1628 	pushedDownQuery->querySource = queryTree->querySource;
1629 	pushedDownQuery->hasSubLinks = queryTree->hasSubLinks;
1630 
1631 	MultiTable *subqueryNode = MultiSubqueryPushdownTable(pushedDownQuery);
1632 
1633 	SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode);
1634 	MultiNode *currentTopNode = (MultiNode *) subqueryCollectNode;
1635 
1636 	/* build project node for the columns to project */
1637 	MultiProject *projectNode = MultiProjectNode(targetEntryList);
1638 	SetChild((MultiUnaryNode *) projectNode, currentTopNode);
1639 	currentTopNode = (MultiNode *) projectNode;
1640 
1641 	/*
1642 	 * We build the extended operator node to capture aggregate functions, group
1643 	 * clauses, sort clauses, limit/offset clauses, and expressions. We need to
1644 	 * distinguish between aggregates and expressions; and we address this later
1645 	 * in the logical optimizer.
1646 	 */
1647 	MultiExtendedOp *extendedOpNode = MultiExtendedOpNode(queryTree, originalQuery);
1648 
1649 	/*
1650 	 * Postgres standard planner converts having qual node to a list of and
1651 	 * clauses and expects havingQual to be of type List when executing the
1652 	 * query later. This function is called on an original query, therefore
1653 	 * havingQual has not been converted yet. Perform conversion here.
1654 	 */
1655 	if (extendedOpNode->havingQual != NULL &&
1656 		!IsA(extendedOpNode->havingQual, List))
1657 	{
1658 		extendedOpNode->havingQual =
1659 			(Node *) make_ands_implicit((Expr *) extendedOpNode->havingQual);
1660 	}
1661 
1662 	/*
1663 	 * Group by on primary key allows all columns to appear in the target
1664 	 * list, but once we wrap the join tree into a subquery the GROUP BY
1665 	 * will no longer directly refer to the primary key and referencing
1666 	 * columns that are not in the GROUP BY would result in an error. To
1667 	 * prevent that we wrap all the columns that do not appear in the
1668 	 * GROUP BY in an any_value aggregate.
1669 	 */
1670 	if (extendedOpNode->groupClauseList != NIL)
1671 	{
1672 		extendedOpNode->targetList = (List *) WrapUngroupedVarsInAnyValueAggregate(
1673 			(Node *) extendedOpNode->targetList,
1674 			extendedOpNode->groupClauseList,
1675 			extendedOpNode->targetList, true);
1676 
1677 		extendedOpNode->havingQual = WrapUngroupedVarsInAnyValueAggregate(
1678 			(Node *) extendedOpNode->havingQual,
1679 			extendedOpNode->groupClauseList,
1680 			extendedOpNode->targetList, false);
1681 	}
1682 
1683 	/*
1684 	 * Postgres standard planner evaluates expressions in the LIMIT/OFFSET clauses.
1685 	 * Since we're using original query here, we should manually evaluate the
1686 	 * expression on the LIMIT and OFFSET clauses. Note that logical optimizer
1687 	 * expects those clauses to be already evaluated.
1688 	 */
1689 	extendedOpNode->limitCount =
1690 		PartiallyEvaluateExpression(extendedOpNode->limitCount, NULL);
1691 	extendedOpNode->limitOffset =
1692 		PartiallyEvaluateExpression(extendedOpNode->limitOffset, NULL);
1693 
1694 	SetChild((MultiUnaryNode *) extendedOpNode, currentTopNode);
1695 	currentTopNode = (MultiNode *) extendedOpNode;
1696 
1697 	return currentTopNode;
1698 }
1699 
1700 
1701 /*
1702  * CreateSubqueryTargetListAndAdjustVars creates a target entry for each unique
1703  * column in the column list, adjusts the columns to point into the subquery target
1704  * list and returns the new subquery target list.
1705  */
1706 static List *
CreateSubqueryTargetListAndAdjustVars(List * columnList)1707 CreateSubqueryTargetListAndAdjustVars(List *columnList)
1708 {
1709 	Var *column = NULL;
1710 	List *subqueryTargetEntryList = NIL;
1711 
1712 	foreach_ptr(column, columnList)
1713 	{
1714 		/*
1715 		 * To avoid adding the same column multiple times, we first check whether there
1716 		 * is already a target entry containing a Var with the given varno and varattno.
1717 		 */
1718 		AttrNumber resNo = FindResnoForVarInTargetList(subqueryTargetEntryList,
1719 													   column->varno, column->varattno);
1720 		if (resNo == InvalidAttrNumber)
1721 		{
1722 			/* Var is not yet on the target list, create a new entry */
1723 			resNo = list_length(subqueryTargetEntryList) + 1;
1724 
1725 			/*
1726 			 * The join tree in the subquery is an exact duplicate of the original
1727 			 * query. Hence, we can make a copy of the original Var. However, if the
1728 			 * original Var was in a sublink it would be pointing up whereas now it
1729 			 * will be placed directly on the target list. Hence we reset the
1730 			 * varlevelsup.
1731 			 */
1732 			Var *subqueryTargetListVar = (Var *) copyObject(column);
1733 
1734 			subqueryTargetListVar->varlevelsup = 0;
1735 
1736 			TargetEntry *newTargetEntry = makeNode(TargetEntry);
1737 			newTargetEntry->expr = (Expr *) subqueryTargetListVar;
1738 			newTargetEntry->resname = WorkerColumnName(resNo);
1739 			newTargetEntry->resjunk = false;
1740 			newTargetEntry->resno = resNo;
1741 
1742 			subqueryTargetEntryList = lappend(subqueryTargetEntryList, newTargetEntry);
1743 		}
1744 
1745 		/*
1746 		 * Change the original column reference to point to the target list
1747 		 * entry in the subquery. There is only 1 subquery, so the varno is 1.
1748 		 */
1749 		column->varno = 1;
1750 		column->varattno = resNo;
1751 	}
1752 
1753 	return subqueryTargetEntryList;
1754 }
1755 
1756 
1757 /*
1758  * FindResnoForVarInTargetList finds a Var on a target list that has the given varno
1759  * (range table entry number) and varattno (column number) and returns the resno
1760  * of the target list entry.
1761  */
1762 static AttrNumber
FindResnoForVarInTargetList(List * targetList,int varno,int varattno)1763 FindResnoForVarInTargetList(List *targetList, int varno, int varattno)
1764 {
1765 	TargetEntry *targetEntry = NULL;
1766 	foreach_ptr(targetEntry, targetList)
1767 	{
1768 		if (!IsA(targetEntry->expr, Var))
1769 		{
1770 			continue;
1771 		}
1772 
1773 		Var *targetEntryVar = (Var *) targetEntry->expr;
1774 
1775 		if (targetEntryVar->varno == varno && targetEntryVar->varattno == varattno)
1776 		{
1777 			return targetEntry->resno;
1778 		}
1779 	}
1780 
1781 	return InvalidAttrNumber;
1782 }
1783 
1784 
1785 /*
1786  * MultiSubqueryPushdownTable creates a MultiTable from the given subquery,
1787  * populates column list and returns the multitable.
1788  */
1789 static MultiTable *
MultiSubqueryPushdownTable(Query * subquery)1790 MultiSubqueryPushdownTable(Query *subquery)
1791 {
1792 	StringInfo rteName = makeStringInfo();
1793 	List *columnNamesList = NIL;
1794 	ListCell *targetEntryCell = NULL;
1795 
1796 	appendStringInfo(rteName, "worker_subquery");
1797 
1798 	foreach(targetEntryCell, subquery->targetList)
1799 	{
1800 		TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
1801 		columnNamesList = lappend(columnNamesList, makeString(targetEntry->resname));
1802 	}
1803 
1804 	MultiTable *subqueryTableNode = CitusMakeNode(MultiTable);
1805 	subqueryTableNode->subquery = subquery;
1806 	subqueryTableNode->relationId = SUBQUERY_PUSHDOWN_RELATION_ID;
1807 	subqueryTableNode->rangeTableId = SUBQUERY_RANGE_TABLE_ID;
1808 	subqueryTableNode->partitionColumn = PartitionColumnForPushedDownSubquery(subquery);
1809 	subqueryTableNode->alias = makeNode(Alias);
1810 	subqueryTableNode->alias->aliasname = rteName->data;
1811 	subqueryTableNode->referenceNames = makeNode(Alias);
1812 	subqueryTableNode->referenceNames->aliasname = rteName->data;
1813 	subqueryTableNode->referenceNames->colnames = columnNamesList;
1814 
1815 	return subqueryTableNode;
1816 }
1817 
1818 
1819 /*
1820  * PartitionColumnForPushedDownSubquery finds the partition column on the target
1821  * list of a pushed down subquery.
1822  */
1823 static Var *
PartitionColumnForPushedDownSubquery(Query * query)1824 PartitionColumnForPushedDownSubquery(Query *query)
1825 {
1826 	List *targetEntryList = query->targetList;
1827 
1828 	TargetEntry *targetEntry = NULL;
1829 	foreach_ptr(targetEntry, targetEntryList)
1830 	{
1831 		if (targetEntry->resjunk)
1832 		{
1833 			continue;
1834 		}
1835 
1836 		Expr *targetExpression = targetEntry->expr;
1837 		if (IsA(targetExpression, Var))
1838 		{
1839 			bool skipOuterVars = true;
1840 			bool isPartitionColumn = IsPartitionColumn(targetExpression, query,
1841 													   skipOuterVars);
1842 			if (isPartitionColumn)
1843 			{
1844 				Var *partitionColumn = copyObject((Var *) targetExpression);
1845 
1846 				/* the pushed down subquery is the only range table entry */
1847 				partitionColumn->varno = 1;
1848 
1849 				/* point the var to the position in the subquery target list */
1850 				partitionColumn->varattno = targetEntry->resno;
1851 
1852 				return partitionColumn;
1853 			}
1854 		}
1855 	}
1856 
1857 	return NULL;
1858 }
1859